You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:46:00 UTC

[4/7] spark git commit: [SPARK-5654] Integrate SparkR

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
new file mode 100644
index 0000000..2fc0bb2
--- /dev/null
+++ b/R/pkg/R/context.R
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# context.R: SparkContext driven functions
+
+getMinSplits <- function(sc, minSplits) {
+  if (is.null(minSplits)) {
+    defaultParallelism <- callJMethod(sc, "defaultParallelism")
+    minSplits <- min(defaultParallelism, 2)
+  }
+  as.integer(minSplits)
+}
+
+#' Create an RDD from a text file.
+#'
+#' This function reads a text file from HDFS, a local file system (available on all
+#' nodes), or any Hadoop-supported file system URI, and creates an
+#' RDD of strings from it.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#'  value is chosen based on available parallelism.
+#' @return RDD where each item is of type \code{character}
+#' @export
+#' @examples
+#'\dontrun{
+#'  sc <- sparkR.init()
+#'  lines <- textFile(sc, "myfile.txt")
+#'}
+textFile <- function(sc, path, minSplits = NULL) {
+  # Allow the user to have a more flexible definiton of the text file path
+  path <- suppressWarnings(normalizePath(path))
+  #' Convert a string vector of paths to a string containing comma separated paths
+  path <- paste(path, collapse = ",")
+
+  jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits))
+  # jrdd is of type JavaRDD[String]
+  RDD(jrdd, "string")
+}
+
+#' Load an RDD saved as a SequenceFile containing serialized objects.
+#'
+#' The file to be loaded should be one that was previously generated by calling
+#' saveAsObjectFile() of the RDD class.
+#'
+#' @param sc SparkContext to use
+#' @param path Path of file to read. A vector of multiple paths is allowed.
+#' @param minSplits Minimum number of splits to be created. If NULL, the default
+#'  value is chosen based on available parallelism.
+#' @return RDD containing serialized R objects.
+#' @seealso saveAsObjectFile
+#' @export
+#' @examples
+#'\dontrun{
+#'  sc <- sparkR.init()
+#'  rdd <- objectFile(sc, "myfile")
+#'}
+objectFile <- function(sc, path, minSplits = NULL) {
+  # Allow the user to have a more flexible definiton of the text file path
+  path <- suppressWarnings(normalizePath(path))
+  #' Convert a string vector of paths to a string containing comma separated paths
+  path <- paste(path, collapse = ",")
+
+  jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits))
+  # Assume the RDD contains serialized R objects.
+  RDD(jrdd, "byte")
+}
+
+#' Create an RDD from a homogeneous list or vector.
+#'
+#' This function creates an RDD from a local homogeneous list in R. The elements
+#' in the list are split into \code{numSlices} slices and distributed to nodes
+#' in the cluster.
+#'
+#' @param sc SparkContext to use
+#' @param coll collection to parallelize
+#' @param numSlices number of partitions to create in the RDD
+#' @return an RDD created from this collection
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10, 2)
+#' # The RDD should contain 10 elements
+#' length(rdd)
+#'}
+parallelize <- function(sc, coll, numSlices = 1) {
+  # TODO: bound/safeguard numSlices
+  # TODO: unit tests for if the split works for all primitives
+  # TODO: support matrix, data frame, etc
+  if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
+    if (is.data.frame(coll)) {
+      message(paste("context.R: A data frame is parallelized by columns."))
+    } else {
+      if (is.matrix(coll)) {
+        message(paste("context.R: A matrix is parallelized by elements."))
+      } else {
+        message(paste("context.R: parallelize() currently only supports lists and vectors.",
+                      "Calling as.list() to coerce coll into a list."))
+      }
+    }
+    coll <- as.list(coll)
+  }
+
+  if (numSlices > length(coll))
+    numSlices <- length(coll)
+
+  sliceLen <- ceiling(length(coll) / numSlices)
+  slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
+
+  # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
+  # 2-tuples of raws
+  serializedSlices <- lapply(slices, serialize, connection = NULL)
+
+  jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
+                      "createRDDFromArray", sc, serializedSlices)
+
+  RDD(jrdd, "byte")
+}
+
+#' Include this specified package on all workers
+#'
+#' This function can be used to include a package on all workers before the
+#' user's code is executed. This is useful in scenarios where other R package
+#' functions are used in a function passed to functions like \code{lapply}.
+#' NOTE: The package is assumed to be installed on every node in the Spark
+#' cluster.
+#'
+#' @param sc SparkContext to use
+#' @param pkg Package name
+#'
+#' @export
+#' @examples
+#'\dontrun{
+#'  library(Matrix)
+#'
+#'  sc <- sparkR.init()
+#'  # Include the matrix library we will be using
+#'  includePackage(sc, Matrix)
+#'
+#'  generateSparse <- function(x) {
+#'    sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
+#'  }
+#'
+#'  rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
+#'  collect(rdd)
+#'}
+includePackage <- function(sc, pkg) {
+  pkg <- as.character(substitute(pkg))
+  if (exists(".packages", .sparkREnv)) {
+    packages <- .sparkREnv$.packages
+  } else {
+    packages <- list()
+  }
+  packages <- c(packages, pkg)
+  .sparkREnv$.packages <- packages
+}
+
+#' @title Broadcast a variable to all workers
+#'
+#' @description
+#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
+#' object for reading it in distributed functions.
+#'
+#' @param sc Spark Context to use
+#' @param object Object to be broadcast
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:2, 2L)
+#'
+#' # Large Matrix object that we want to broadcast
+#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
+#' randomMatBr <- broadcast(sc, randomMat)
+#'
+#' # Use the broadcast variable inside the function
+#' useBroadcast <- function(x) {
+#'   sum(value(randomMatBr) * x)
+#' }
+#' sumRDD <- lapply(rdd, useBroadcast)
+#'}
+broadcast <- function(sc, object) {
+  objName <- as.character(substitute(object))
+  serializedObj <- serialize(object, connection = NULL)
+
+  jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
+  id <- as.character(callJMethod(jBroadcast, "id"))
+
+  Broadcast(id, object, jBroadcast, objName)
+}
+
+#' @title Set the checkpoint directory
+#'
+#' Set the directory under which RDDs are going to be checkpointed. The
+#' directory must be a HDFS path if running on a cluster.
+#'
+#' @param sc Spark Context to use
+#' @param dirName Directory path
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' setCheckpointDir(sc, "~/checkpoints")
+#' rdd <- parallelize(sc, 1:2, 2L)
+#' checkpoint(rdd)
+#'}
+setCheckpointDir <- function(sc, dirName) {
+  invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/deserialize.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R
new file mode 100644
index 0000000..257b435
--- /dev/null
+++ b/R/pkg/R/deserialize.R
@@ -0,0 +1,184 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions to deserialize objects from Java.
+
+# Type mapping from Java to R
+# 
+# void -> NULL
+# Int -> integer
+# String -> character
+# Boolean -> logical
+# Double -> double
+# Long -> double
+# Array[Byte] -> raw
+# Date -> Date
+# Time -> POSIXct
+#
+# Array[T] -> list()
+# Object -> jobj
+
+readObject <- function(con) {
+  # Read type first
+  type <- readType(con)
+  readTypedObject(con, type)
+}
+
+readTypedObject <- function(con, type) {
+  switch (type,
+    "i" = readInt(con),
+    "c" = readString(con),
+    "b" = readBoolean(con),
+    "d" = readDouble(con),
+    "r" = readRaw(con),
+    "D" = readDate(con),
+    "t" = readTime(con),
+    "l" = readList(con),
+    "n" = NULL,
+    "j" = getJobj(readString(con)),
+    stop(paste("Unsupported type for deserialization", type)))
+}
+
+readString <- function(con) {
+  stringLen <- readInt(con)
+  string <- readBin(con, raw(), stringLen, endian = "big")
+  rawToChar(string)
+}
+
+readInt <- function(con) {
+  readBin(con, integer(), n = 1, endian = "big")
+}
+
+readDouble <- function(con) {
+  readBin(con, double(), n = 1, endian = "big")
+}
+
+readBoolean <- function(con) {
+  as.logical(readInt(con))
+}
+
+readType <- function(con) {
+  rawToChar(readBin(con, "raw", n = 1L))
+}
+
+readDate <- function(con) {
+  as.Date(readString(con))
+}
+
+readTime <- function(con) {
+  t <- readDouble(con)
+  as.POSIXct(t, origin = "1970-01-01")
+}
+
+# We only support lists where all elements are of same type
+readList <- function(con) {
+  type <- readType(con)
+  len <- readInt(con)
+  if (len > 0) {
+    l <- vector("list", len)
+    for (i in 1:len) {
+      l[[i]] <- readTypedObject(con, type)
+    }
+    l
+  } else {
+    list()
+  }
+}
+
+readRaw <- function(con) {
+  dataLen <- readInt(con)
+  data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
+}
+
+readRawLen <- function(con, dataLen) {
+  data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
+}
+
+readDeserialize <- function(con) {
+  # We have two cases that are possible - In one, the entire partition is
+  # encoded as a byte array, so we have only one value to read. If so just
+  # return firstData
+  dataLen <- readInt(con)
+  firstData <- unserialize(
+      readBin(con, raw(), as.integer(dataLen), endian = "big"))
+
+  # Else, read things into a list
+  dataLen <- readInt(con)
+  if (length(dataLen) > 0 && dataLen > 0) {
+    data <- list(firstData)
+    while (length(dataLen) > 0 && dataLen > 0) {
+      data[[length(data) + 1L]] <- unserialize(
+          readBin(con, raw(), as.integer(dataLen), endian = "big"))
+      dataLen <- readInt(con)
+    }
+    unlist(data, recursive = FALSE)
+  } else {
+    firstData
+  }
+}
+
+readDeserializeRows <- function(inputCon) {
+  # readDeserializeRows will deserialize a DataOutputStream composed of
+  # a list of lists. Since the DOS is one continuous stream and
+  # the number of rows varies, we put the readRow function in a while loop
+  # that termintates when the next row is empty.
+  data <- list()
+  while(TRUE) {
+    row <- readRow(inputCon)
+    if (length(row) == 0) {
+      break
+    }
+    data[[length(data) + 1L]] <- row
+  }
+  data # this is a list of named lists now
+}
+
+readRowList <- function(obj) {
+  # readRowList is meant for use inside an lapply. As a result, it is
+  # necessary to open a standalone connection for the row and consume
+  # the numCols bytes inside the read function in order to correctly
+  # deserialize the row.
+  rawObj <- rawConnection(obj, "r+")
+  on.exit(close(rawObj))
+  readRow(rawObj)
+}
+
+readRow <- function(inputCon) {
+  numCols <- readInt(inputCon)
+  if (length(numCols) > 0 && numCols > 0) {
+    lapply(1:numCols, function(x) {
+      obj <- readObject(inputCon)
+      if (is.null(obj)) {
+        NA
+      } else {
+        obj
+      }
+    }) # each row is a list now
+  } else {
+    list()
+  }
+}
+
+# Take a single column as Array[Byte] and deserialize it into an atomic vector
+readCol <- function(inputCon, numRows) {
+  # sapply can not work with POSIXlt
+  do.call(c, lapply(1:numRows, function(x) {
+    value <- readObject(inputCon)
+    # Replace NULL with NA so we can coerce to vectors
+    if (is.null(value)) NA else value
+  }))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
new file mode 100644
index 0000000..5fb1cca
--- /dev/null
+++ b/R/pkg/R/generics.R
@@ -0,0 +1,543 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+############ RDD Actions and Transformations ############
+
+#' @rdname aggregateRDD
+#' @seealso reduce
+#' @export
+setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
+
+#' @rdname cache-methods
+#' @export
+setGeneric("cache", function(x) { standardGeneric("cache") })
+
+#' @rdname coalesce
+#' @seealso repartition
+#' @export
+setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
+
+#' @rdname checkpoint-methods
+#' @export
+setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collect", function(x, ...) { standardGeneric("collect") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") })
+
+#' @rdname collect-methods
+#' @export
+setGeneric("collectPartition",
+           function(x, partitionId) {
+             standardGeneric("collectPartition")
+           })
+
+#' @rdname count
+#' @export
+setGeneric("count", function(x) { standardGeneric("count") })
+
+#' @rdname countByValue
+#' @export
+setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
+
+#' @rdname distinct
+#' @export
+setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") })
+
+#' @rdname filterRDD
+#' @export
+setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") })
+
+#' @rdname first
+#' @export
+setGeneric("first", function(x) { standardGeneric("first") })
+
+#' @rdname flatMap
+#' @export
+setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") })
+
+#' @rdname fold
+#' @seealso reduce
+#' @export
+setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") })
+
+#' @rdname foreach
+#' @export
+setGeneric("foreach", function(x, func) { standardGeneric("foreach") })
+
+#' @rdname foreach
+#' @export
+setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") })
+
+# The jrdd accessor function.
+setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
+
+#' @rdname glom
+#' @export
+setGeneric("glom", function(x) { standardGeneric("glom") })
+
+#' @rdname keyBy
+#' @export
+setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
+
+#' @rdname lapplyPartition
+#' @export
+setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @export
+setGeneric("lapplyPartitionsWithIndex",
+           function(X, FUN) {
+             standardGeneric("lapplyPartitionsWithIndex")
+           })
+
+#' @rdname lapply
+#' @export
+setGeneric("map", function(X, FUN) { standardGeneric("map") })
+
+#' @rdname lapplyPartition
+#' @export
+setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") })
+
+#' @rdname lapplyPartitionsWithIndex
+#' @export
+setGeneric("mapPartitionsWithIndex",
+           function(X, FUN) { standardGeneric("mapPartitionsWithIndex") })
+
+#' @rdname maximum
+#' @export
+setGeneric("maximum", function(x) { standardGeneric("maximum") })
+
+#' @rdname minimum
+#' @export
+setGeneric("minimum", function(x) { standardGeneric("minimum") })
+
+#' @rdname sumRDD 
+#' @export
+setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
+
+#' @rdname name
+#' @export
+setGeneric("name", function(x) { standardGeneric("name") })
+
+#' @rdname numPartitions
+#' @export
+setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })
+
+#' @rdname persist
+#' @export
+setGeneric("persist", function(x, newLevel) { standardGeneric("persist") })
+
+#' @rdname pipeRDD
+#' @export
+setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")})
+
+#' @rdname reduce
+#' @export
+setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
+
+#' @rdname repartition
+#' @seealso coalesce
+#' @export
+setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
+
+#' @rdname sampleRDD
+#' @export
+setGeneric("sampleRDD",
+           function(x, withReplacement, fraction, seed) {
+             standardGeneric("sampleRDD")
+           })
+
+#' @rdname saveAsObjectFile
+#' @seealso objectFile
+#' @export
+setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") })
+
+#' @rdname saveAsTextFile
+#' @export
+setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") })
+
+#' @rdname setName
+#' @export
+setGeneric("setName", function(x, name) { standardGeneric("setName") })
+
+#' @rdname sortBy
+#' @export
+setGeneric("sortBy",
+           function(x, func, ascending = TRUE, numPartitions = 1L) {
+             standardGeneric("sortBy")
+           })
+
+#' @rdname take
+#' @export
+setGeneric("take", function(x, num) { standardGeneric("take") })
+
+#' @rdname takeOrdered
+#' @export
+setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") })
+
+#' @rdname takeSample
+#' @export
+setGeneric("takeSample",
+           function(x, withReplacement, num, seed) {
+             standardGeneric("takeSample")
+           })
+
+#' @rdname top
+#' @export
+setGeneric("top", function(x, num) { standardGeneric("top") })
+
+#' @rdname unionRDD
+#' @export
+setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") })
+
+#' @rdname unpersist-methods
+#' @export
+setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
+
+#' @rdname zipRDD
+#' @export
+setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
+
+#' @rdname zipWithIndex
+#' @seealso zipWithUniqueId
+#' @export
+setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") })
+
+#' @rdname zipWithUniqueId
+#' @seealso zipWithIndex
+#' @export
+setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") })
+
+
+############ Binary Functions #############
+
+#' @rdname countByKey
+#' @export
+setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
+
+#' @rdname flatMapValues
+#' @export
+setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })
+
+#' @rdname keys
+#' @export
+setGeneric("keys", function(x) { standardGeneric("keys") })
+
+#' @rdname lookup
+#' @export
+setGeneric("lookup", function(x, key) { standardGeneric("lookup") })
+
+#' @rdname mapValues
+#' @export
+setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") })
+
+#' @rdname values
+#' @export
+setGeneric("values", function(x) { standardGeneric("values") })
+
+
+
+############ Shuffle Functions ############
+
+#' @rdname aggregateByKey
+#' @seealso foldByKey, combineByKey
+#' @export
+setGeneric("aggregateByKey",
+           function(x, zeroValue, seqOp, combOp, numPartitions) {
+             standardGeneric("aggregateByKey")
+           })
+
+#' @rdname cogroup
+#' @export
+setGeneric("cogroup",
+           function(..., numPartitions) {
+             standardGeneric("cogroup")
+           },
+           signature = "...")
+
+#' @rdname combineByKey
+#' @seealso groupByKey, reduceByKey
+#' @export
+setGeneric("combineByKey",
+           function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
+             standardGeneric("combineByKey")
+           })
+
+#' @rdname foldByKey
+#' @seealso aggregateByKey, combineByKey
+#' @export
+setGeneric("foldByKey",
+           function(x, zeroValue, func, numPartitions) {
+             standardGeneric("foldByKey")
+           })
+
+#' @rdname join-methods
+#' @export
+setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") })
+
+#' @rdname groupByKey
+#' @seealso reduceByKey
+#' @export
+setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") })
+
+#' @rdname join-methods
+#' @export
+setGeneric("join", function(x, y, ...) { standardGeneric("join") })
+
+#' @rdname join-methods
+#' @export
+setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
+
+#' @rdname partitionBy
+#' @export
+setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
+
+#' @rdname reduceByKey
+#' @seealso groupByKey
+#' @export
+setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")})
+
+#' @rdname reduceByKeyLocally
+#' @seealso reduceByKey
+#' @export
+setGeneric("reduceByKeyLocally",
+           function(x, combineFunc) {
+             standardGeneric("reduceByKeyLocally")
+           })
+
+#' @rdname join-methods
+#' @export
+setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") })
+
+#' @rdname sortByKey
+#' @export
+setGeneric("sortByKey", function(x, ascending = TRUE, numPartitions = 1L) {
+  standardGeneric("sortByKey")
+})
+
+
+################### Broadcast Variable Methods #################
+
+#' @rdname broadcast
+#' @export
+setGeneric("value", function(bcast) { standardGeneric("value") })
+
+
+
+####################  DataFrame Methods ########################
+
+#' @rdname schema
+#' @export
+setGeneric("columns", function(x) {standardGeneric("columns") })
+
+#' @rdname schema
+#' @export
+setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
+
+#' @rdname explain
+#' @export
+setGeneric("explain", function(x, ...) { standardGeneric("explain") })
+
+#' @rdname filter
+#' @export
+setGeneric("filter", function(x, condition) { standardGeneric("filter") })
+
+#' @rdname DataFrame
+#' @export
+setGeneric("groupBy", function(x, ...) { standardGeneric("groupBy") })
+
+#' @rdname insertInto
+#' @export
+setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") })
+
+#' @rdname intersect
+#' @export
+setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
+
+#' @rdname isLocal
+#' @export
+setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
+
+#' @rdname limit
+#' @export
+setGeneric("limit", function(x, num) {standardGeneric("limit") })
+
+#' @rdname sortDF
+#' @export
+setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") })
+
+#' @rdname schema
+#' @export
+setGeneric("printSchema", function(x) { standardGeneric("printSchema") })
+
+#' @rdname registerTempTable
+#' @export
+setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") })
+
+#' @rdname sampleDF
+#' @export
+setGeneric("sampleDF",
+           function(x, withReplacement, fraction, seed) {
+             standardGeneric("sampleDF")
+          })
+
+#' @rdname saveAsParquetFile
+#' @export
+setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
+
+#' @rdname saveAsTable
+#' @export
+setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
+  standardGeneric("saveAsTable")
+})
+
+#' @rdname saveAsTable
+#' @export
+setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
+
+#' @rdname schema
+#' @export
+setGeneric("schema", function(x) { standardGeneric("schema") })
+
+#' @rdname select
+#' @export
+setGeneric("select", function(x, col, ...) { standardGeneric("select") } )
+
+#' @rdname select
+#' @export
+setGeneric("selectExpr", function(x, expr, ...) { standardGeneric("selectExpr") })
+
+#' @rdname showDF
+#' @export
+setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
+
+#' @rdname sortDF
+#' @export
+setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") })
+
+#' @rdname subtract
+#' @export
+setGeneric("subtract", function(x, y) { standardGeneric("subtract") })
+
+#' @rdname tojson
+#' @export
+setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
+
+#' @rdname DataFrame
+#' @export
+setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
+
+#' @rdname unionAll
+#' @export
+setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
+
+#' @rdname filter
+#' @export
+setGeneric("where", function(x, condition) { standardGeneric("where") })
+
+#' @rdname withColumn
+#' @export
+setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") })
+
+#' @rdname withColumnRenamed
+#' @export
+setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
+  standardGeneric("withColumnRenamed") })
+
+
+###################### Column Methods ##########################
+
+#' @rdname column
+#' @export
+setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCountDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("asc", function(x) { standardGeneric("asc") })
+
+#' @rdname column
+#' @export
+setGeneric("avg", function(x, ...) { standardGeneric("avg") })
+
+#' @rdname column
+#' @export
+setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
+
+#' @rdname column
+#' @export
+setGeneric("contains", function(x, ...) { standardGeneric("contains") })
+#' @rdname column
+#' @export
+setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("desc", function(x) { standardGeneric("desc") })
+
+#' @rdname column
+#' @export
+setGeneric("endsWith", function(x, ...) { standardGeneric("endsWith") })
+
+#' @rdname column
+#' @export
+setGeneric("getField", function(x, ...) { standardGeneric("getField") })
+
+#' @rdname column
+#' @export
+setGeneric("getItem", function(x, ...) { standardGeneric("getItem") })
+
+#' @rdname column
+#' @export
+setGeneric("isNull", function(x) { standardGeneric("isNull") })
+
+#' @rdname column
+#' @export
+setGeneric("isNotNull", function(x) { standardGeneric("isNotNull") })
+
+#' @rdname column
+#' @export
+setGeneric("last", function(x) { standardGeneric("last") })
+
+#' @rdname column
+#' @export
+setGeneric("like", function(x, ...) { standardGeneric("like") })
+
+#' @rdname column
+#' @export
+setGeneric("lower", function(x) { standardGeneric("lower") })
+
+#' @rdname column
+#' @export
+setGeneric("rlike", function(x, ...) { standardGeneric("rlike") })
+
+#' @rdname column
+#' @export
+setGeneric("startsWith", function(x, ...) { standardGeneric("startsWith") })
+
+#' @rdname column
+#' @export
+setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") })
+
+#' @rdname column
+#' @export
+setGeneric("upper", function(x) { standardGeneric("upper") })
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/group.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
new file mode 100644
index 0000000..09fc0a7
--- /dev/null
+++ b/R/pkg/R/group.R
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# group.R - GroupedData class and methods implemented in S4 OO classes
+
+setOldClass("jobj")
+
+#' @title S4 class that represents a GroupedData
+#' @description GroupedDatas can be created using groupBy() on a DataFrame
+#' @rdname GroupedData
+#' @seealso groupBy
+#'
+#' @param sgd A Java object reference to the backing Scala GroupedData
+#' @export
+setClass("GroupedData",
+         slots = list(sgd = "jobj"))
+
+setMethod("initialize", "GroupedData", function(.Object, sgd) {
+  .Object@sgd <- sgd
+  .Object
+})
+
+#' @rdname DataFrame
+groupedData <- function(sgd) {
+  new("GroupedData", sgd)
+}
+
+
+#' @rdname show
+setMethod("show", "GroupedData",
+          function(object) {
+            cat("GroupedData\n")
+          })
+
+#' Count
+#'
+#' Count the number of rows for each group.
+#' The resulting DataFrame will also contain the grouping columns.
+#'
+#' @param x a GroupedData
+#' @return a DataFrame
+#' @export
+#' @examples
+#' \dontrun{
+#'   count(groupBy(df, "name"))
+#' }
+setMethod("count",
+          signature(x = "GroupedData"),
+          function(x) {
+            dataFrame(callJMethod(x@sgd, "count"))
+          })
+
+#' Agg
+#'
+#' Aggregates on the entire DataFrame without groups.
+#' The resulting DataFrame will also contain the grouping columns.
+#'
+#' df2 <- agg(df, <column> = <aggFunction>)
+#' df2 <- agg(df, newColName = aggFunction(column))
+#'
+#' @param x a GroupedData
+#' @return a DataFrame
+#' @rdname agg
+#' @examples
+#' \dontrun{
+#'  df2 <- agg(df, age = "sum")  # new column name will be created as 'SUM(age#0)'
+#'  df2 <- agg(df, ageSum = sum(df$age)) # Creates a new column named ageSum
+#' }
+setGeneric("agg", function (x, ...) { standardGeneric("agg") })
+
+setMethod("agg",
+          signature(x = "GroupedData"),
+          function(x, ...) {
+            cols = list(...)
+            stopifnot(length(cols) > 0)
+            if (is.character(cols[[1]])) {
+              cols <- varargsToEnv(...)
+              sdf <- callJMethod(x@sgd, "agg", cols)
+            } else if (class(cols[[1]]) == "Column") {
+              ns <- names(cols)
+              if (!is.null(ns)) {
+                for (n in ns) {
+                  if (n != "") {
+                    cols[[n]] = alias(cols[[n]], n)
+                  }
+                }
+              }
+              jcols <- lapply(cols, function(c) { c@jc })
+              # the GroupedData.agg(col, cols*) API does not contain grouping Column
+              sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "aggWithGrouping",
+                                 x@sgd, listToSeq(jcols))
+            } else {
+              stop("agg can only support Column or character")
+            }
+            dataFrame(sdf)
+          })
+
+
+# sum/mean/avg/min/max
+methods <- c("sum", "mean", "avg", "min", "max")
+
+createMethod <- function(name) {
+  setMethod(name,
+            signature(x = "GroupedData"),
+            function(x, ...) {
+              sdf <- callJMethod(x@sgd, name, toSeq(...))
+              dataFrame(sdf)
+            })
+}
+
+createMethods <- function() {
+  for (name in methods) {
+    createMethod(name)
+  }
+}
+
+createMethods()
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/jobj.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/jobj.R b/R/pkg/R/jobj.R
new file mode 100644
index 0000000..4180f14
--- /dev/null
+++ b/R/pkg/R/jobj.R
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# References to objects that exist on the JVM backend
+# are maintained using the jobj. 
+
+# Maintain a reference count of Java object references
+# This allows us to GC the java object when it is safe
+.validJobjs <- new.env(parent = emptyenv())
+
+# List of object ids to be removed
+.toRemoveJobjs <- new.env(parent = emptyenv())
+
+# Check if jobj was created with the current SparkContext
+isValidJobj <- function(jobj) {
+  if (exists(".scStartTime", envir = .sparkREnv)) {
+    jobj$appId == get(".scStartTime", envir = .sparkREnv)
+  } else {
+    FALSE
+  }
+}
+
+getJobj <- function(objId) {
+  newObj <- jobj(objId)
+  if (exists(objId, .validJobjs)) {
+    .validJobjs[[objId]] <- .validJobjs[[objId]] + 1
+  } else {
+    .validJobjs[[objId]] <- 1
+  }
+  newObj
+}
+
+# Handler for a java object that exists on the backend.
+jobj <- function(objId) {
+  if (!is.character(objId)) {
+    stop("object id must be a character")
+  }
+  # NOTE: We need a new env for a jobj as we can only register
+  # finalizers for environments or external references pointers.
+  obj <- structure(new.env(parent = emptyenv()), class = "jobj")
+  obj$id <- objId
+  obj$appId <- get(".scStartTime", envir = .sparkREnv)
+
+  # Register a finalizer to remove the Java object when this reference
+  # is garbage collected in R
+  reg.finalizer(obj, cleanup.jobj)
+  obj
+}
+
+#' Print a JVM object reference.
+#'
+#' This function prints the type and id for an object stored
+#' in the SparkR JVM backend.
+#'
+#' @param x The JVM object reference
+#' @param ... further arguments passed to or from other methods
+print.jobj <- function(x, ...) {
+  cls <- callJMethod(x, "getClass")
+  name <- callJMethod(cls, "getName")
+  cat("Java ref type", name, "id", x$id, "\n", sep = " ")
+}
+
+cleanup.jobj <- function(jobj) {
+  if (isValidJobj(jobj)) {
+    objId <- jobj$id
+    # If we don't know anything about this jobj, ignore it
+    if (exists(objId, envir = .validJobjs)) {
+      .validJobjs[[objId]] <- .validJobjs[[objId]] - 1
+
+      if (.validJobjs[[objId]] == 0) {
+        rm(list = objId, envir = .validJobjs)
+        # NOTE: We cannot call removeJObject here as the finalizer may be run
+        # in the middle of another RPC. Thus we queue up this object Id to be removed
+        # and then run all the removeJObject when the next RPC is called.
+        .toRemoveJobjs[[objId]] <- 1
+      }
+    }
+  }
+}
+
+clearJobjs <- function() {
+  valid <- ls(.validJobjs)
+  rm(list = valid, envir = .validJobjs)
+
+  removeList <- ls(.toRemoveJobjs)
+  rm(list = removeList, envir = .toRemoveJobjs)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/pairRDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R
new file mode 100644
index 0000000..c2396c3
--- /dev/null
+++ b/R/pkg/R/pairRDD.R
@@ -0,0 +1,789 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Operations supported on RDDs contains pairs (i.e key, value)
+
+############ Actions and Transformations ############
+
+#' Look up elements of a key in an RDD
+#'
+#' @description
+#' \code{lookup} returns a list of values in this RDD for key key.
+#'
+#' @param x The RDD to collect
+#' @param key The key to look up for
+#' @return a list of values in this RDD for key key
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
+#' rdd <- parallelize(sc, pairs)
+#' lookup(rdd, 1) # list(1, 3)
+#'}
+#' @rdname lookup
+#' @aliases lookup,RDD-method
+setMethod("lookup",
+          signature(x = "RDD", key = "ANY"),
+          function(x, key) {
+            partitionFunc <- function(part) {
+              filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))]
+              lapply(filtered, function(i) { i[[2]] })
+            }
+            valsRDD <- lapplyPartition(x, partitionFunc)
+            collect(valsRDD)
+          })
+
+#' Count the number of elements for each key, and return the result to the
+#' master as lists of (key, count) pairs.
+#'
+#' Same as countByKey in Spark.
+#'
+#' @param x The RDD to count keys.
+#' @return list of (key, count) pairs, where count is number of each key in rdd.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
+#' countByKey(rdd) # ("a", 2L), ("b", 1L)
+#'}
+#' @rdname countByKey
+#' @aliases countByKey,RDD-method
+setMethod("countByKey",
+          signature(x = "RDD"),
+          function(x) {
+            keys <- lapply(x, function(item) { item[[1]] })
+            countByValue(keys)
+          })
+
+#' Return an RDD with the keys of each tuple.
+#'
+#' @param x The RDD from which the keys of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(keys(rdd)) # list(1, 3)
+#'}
+#' @rdname keys
+#' @aliases keys,RDD
+setMethod("keys",
+          signature(x = "RDD"),
+          function(x) {
+            func <- function(k) {
+              k[[1]]
+            }
+            lapply(x, func)
+          })
+
+#' Return an RDD with the values of each tuple.
+#'
+#' @param x The RDD from which the values of each tuple is returned.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+#' collect(values(rdd)) # list(2, 4)
+#'}
+#' @rdname values
+#' @aliases values,RDD
+setMethod("values",
+          signature(x = "RDD"),
+          function(x) {
+            func <- function(v) {
+              v[[2]]
+            }
+            lapply(x, func)
+          })
+
+#' Applies a function to all values of the elements, without modifying the keys.
+#'
+#' The same as `mapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, 1:10)
+#' makePairs <- lapply(rdd, function(x) { list(x, x) })
+#' collect(mapValues(makePairs, function(x) { x * 2) })
+#' Output: list(list(1,2), list(2,4), list(3,6), ...)
+#'}
+#' @rdname mapValues
+#' @aliases mapValues,RDD,function-method
+setMethod("mapValues",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            func <- function(x) {
+              list(x[[1]], FUN(x[[2]]))
+            }
+            lapply(X, func)
+          })
+
+#' Pass each value in the key-value pair RDD through a flatMap function without
+#' changing the keys; this also retains the original RDD's partitioning.
+#'
+#' The same as 'flatMapValues()' in Spark.
+#'
+#' @param X The RDD to apply the transformation.
+#' @param FUN the transformation to apply on the value of each element.
+#' @return a new RDD created by the transformation.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+#' collect(flatMapValues(rdd, function(x) { x }))
+#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
+#'}
+#' @rdname flatMapValues
+#' @aliases flatMapValues,RDD,function-method
+setMethod("flatMapValues",
+          signature(X = "RDD", FUN = "function"),
+          function(X, FUN) {
+            flatMapFunc <- function(x) {
+              lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) })
+            }
+            flatMap(X, flatMapFunc)
+          })
+
+############ Shuffle Functions ############
+
+#' Partition an RDD by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' For each element of this RDD, the partitioner is used to compute a hash
+#' function and the RDD is partitioned using this hash value.
+#'
+#' @param x The RDD to partition. Should be an RDD where each element is
+#'             list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @param ... Other optional arguments to partitionBy.
+#'
+#' @param partitionFunc The partition function to use. Uses a default hashCode
+#'                      function if not provided
+#' @return An RDD partitioned using the specified partitioner.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- partitionBy(rdd, 2L)
+#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
+#'}
+#' @rdname partitionBy
+#' @aliases partitionBy,RDD,integer-method
+setMethod("partitionBy",
+          signature(x = "RDD", numPartitions = "integer"),
+          function(x, numPartitions, partitionFunc = hashCode) {
+
+            #if (missing(partitionFunc)) {
+            #  partitionFunc <- hashCode
+            #}
+
+            partitionFunc <- cleanClosure(partitionFunc)
+            serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
+
+            packageNamesArr <- serialize(.sparkREnv$.packages,
+                                         connection = NULL)
+            broadcastArr <- lapply(ls(.broadcastNames), function(name) {
+                                   get(name, .broadcastNames) })
+            jrdd <- getJRDD(x)
+
+            # We create a PairwiseRRDD that extends RDD[(Array[Byte],
+            # Array[Byte])], where the key is the hashed split, the value is
+            # the content (key-val pairs).
+            pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
+                                       callJMethod(jrdd, "rdd"),
+                                       as.integer(numPartitions),
+                                       serializedHashFuncBytes,
+                                       getSerializedMode(x),
+                                       packageNamesArr,
+                                       as.character(.sparkREnv$libname),
+                                       broadcastArr,
+                                       callJMethod(jrdd, "classTag"))
+
+            # Create a corresponding partitioner.
+            rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
+                                       as.integer(numPartitions))
+
+            # Call partitionBy on the obtained PairwiseRDD.
+            javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
+            javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)
+
+            # Call .values() on the result to get back the final result, the
+            # shuffled acutal content key-val pairs.
+            r <- callJMethod(javaPairRDD, "values")
+
+            RDD(r, serializedMode = "byte")
+          })
+
+#' Group values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and group values for each key in the RDD into a single sequence.
+#'
+#' @param x The RDD to group. Should be an RDD where each element is
+#'             list(K, V) or c(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, list(V))
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- groupByKey(rdd, 2L)
+#' grouped <- collect(parts)
+#' grouped[[1]] # Should be a list(1, list(2, 4))
+#'}
+#' @rdname groupByKey
+#' @aliases groupByKey,RDD,integer-method
+setMethod("groupByKey",
+          signature(x = "RDD", numPartitions = "integer"),
+          function(x, numPartitions) {
+            shuffled <- partitionBy(x, numPartitions)
+            groupVals <- function(part) {
+              vals <- new.env()
+              keys <- new.env()
+              pred <- function(item) exists(item$hash, keys)
+              appendList <- function(acc, i) {
+                addItemToAccumulator(acc, i)
+                acc
+              }
+              makeList <- function(i) {
+                acc <- initAccumulator()
+                addItemToAccumulator(acc, i)
+                acc
+              }
+              # Each item in the partition is list of (K, V)
+              lapply(part,
+                     function(item) {
+                       item$hash <- as.character(hashCode(item[[1]]))
+                       updateOrCreatePair(item, keys, vals, pred,
+                                          appendList, makeList)
+                     })
+              # extract out data field
+              vals <- eapply(vals,
+                             function(i) {
+                               length(i$data) <- i$counter
+                               i$data
+                             })
+              # Every key in the environment contains a list
+              # Convert that to list(K, Seq[V])
+              convertEnvsToList(keys, vals)
+            }
+            lapplyPartition(shuffled, groupVals)
+          })
+
+#' Merge values by key
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#'             list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, V') where V' is the merged
+#'         value
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- reduceByKey(rdd, "+", 2L)
+#' reduced <- collect(parts)
+#' reduced[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname reduceByKey
+#' @aliases reduceByKey,RDD,integer-method
+setMethod("reduceByKey",
+          signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"),
+          function(x, combineFunc, numPartitions) {
+            reduceVals <- function(part) {
+              vals <- new.env()
+              keys <- new.env()
+              pred <- function(item) exists(item$hash, keys)
+              lapply(part,
+                     function(item) {
+                       item$hash <- as.character(hashCode(item[[1]]))
+                       updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
+                     })
+              convertEnvsToList(keys, vals)
+            }
+            locallyReduced <- lapplyPartition(x, reduceVals)
+            shuffled <- partitionBy(locallyReduced, numPartitions)
+            lapplyPartition(shuffled, reduceVals)
+          })
+
+#' Merge values by key locally
+#'
+#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
+#' and merges the values for each key using an associative reduce function, but return the
+#' results immediately to the driver as an R list.
+#'
+#' @param x The RDD to reduce by key. Should be an RDD where each element is
+#'             list(K, V) or c(K, V).
+#' @param combineFunc The associative reduce function to use.
+#' @return A list of elements of type list(K, V') where V' is the merged value for each key
+#' @seealso reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' reduced <- reduceByKeyLocally(rdd, "+")
+#' reduced # list(list(1, 6), list(1.1, 3))
+#'}
+#' @rdname reduceByKeyLocally
+#' @aliases reduceByKeyLocally,RDD,integer-method
+setMethod("reduceByKeyLocally",
+          signature(x = "RDD", combineFunc = "ANY"),
+          function(x, combineFunc) {
+            reducePart <- function(part) {
+              vals <- new.env()
+              keys <- new.env()
+              pred <- function(item) exists(item$hash, keys)
+              lapply(part,
+                     function(item) {
+                       item$hash <- as.character(hashCode(item[[1]]))
+                       updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
+                     })
+              list(list(keys, vals)) # return hash to avoid re-compute in merge
+            }
+            mergeParts <- function(accum, x) {
+              pred <- function(item) {
+                exists(item$hash, accum[[1]])
+              }
+              lapply(ls(x[[1]]),
+                     function(name) {
+                       item <- list(x[[1]][[name]], x[[2]][[name]])
+                       item$hash <- name
+                       updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
+                     })
+              accum
+            }
+            reduced <- mapPartitions(x, reducePart)
+            merged <- reduce(reduced, mergeParts)
+            convertEnvsToList(merged[[1]], merged[[2]])
+          })
+
+#' Combine values by key
+#'
+#' Generic function to combine the elements for each key using a custom set of
+#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
+#' for a "combined type" C. Note that V and C can be different -- for example, one
+#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
+
+#' Users provide three functions:
+#' \itemize{
+#'   \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
+#'   \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
+#'   \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
+#'    two lists).
+#' }
+#'
+#' @param x The RDD to combine. Should be an RDD where each element is
+#'             list(K, V) or c(K, V).
+#' @param createCombiner Create a combiner (C) given a value (V)
+#' @param mergeValue Merge the given value (V) with an existing combiner (C)
+#' @param mergeCombiners Merge two combiners and return a new combiner
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where each element is list(K, C) where C is the combined type
+#'
+#' @seealso groupByKey, reduceByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
+#' rdd <- parallelize(sc, pairs)
+#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
+#' combined <- collect(parts)
+#' combined[[1]] # Should be a list(1, 6)
+#'}
+#' @rdname combineByKey
+#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
+setMethod("combineByKey",
+          signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
+                    mergeCombiners = "ANY", numPartitions = "integer"),
+          function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
+            combineLocally <- function(part) {
+              combiners <- new.env()
+              keys <- new.env()
+              pred <- function(item) exists(item$hash, keys)
+              lapply(part,
+                     function(item) {
+                       item$hash <- as.character(item[[1]])
+                       updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
+                     })
+              convertEnvsToList(keys, combiners)
+            }
+            locallyCombined <- lapplyPartition(x, combineLocally)
+            shuffled <- partitionBy(locallyCombined, numPartitions)
+            mergeAfterShuffle <- function(part) {
+              combiners <- new.env()
+              keys <- new.env()
+              pred <- function(item) exists(item$hash, keys)
+              lapply(part,
+                     function(item) {
+                       item$hash <- as.character(item[[1]])
+                       updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
+                     })
+              convertEnvsToList(keys, combiners)
+            }
+            lapplyPartition(shuffled, mergeAfterShuffle)
+          })
+
+#' Aggregate a pair RDD by each key.
+#' 
+#' Aggregate the values of each key in an RDD, using given combine functions
+#' and a neutral "zero value". This function can return a different result type,
+#' U, than the type of the values in this RDD, V. Thus, we need one operation
+#' for merging a V into a U and one operation for merging two U's, The former 
+#' operation is used for merging values within a partition, and the latter is 
+#' used for merging values between partitions. To avoid memory allocation, both 
+#' of these functions are allowed to modify and return their first argument 
+#' instead of creating a new U.
+#' 
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param seqOp A function to aggregate the values of each key. It may return 
+#'              a different result type from the type of the values.
+#' @param combOp A function to aggregate results of seqOp.
+#' @return An RDD containing the aggregation result.
+#' @seealso foldByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' zeroValue <- list(0, 0)
+#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) 
+#'   # list(list(1, list(3, 2)), list(2, list(7, 2)))
+#'}
+#' @rdname aggregateByKey
+#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
+setMethod("aggregateByKey",
+          signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
+                    combOp = "ANY", numPartitions = "integer"),
+          function(x, zeroValue, seqOp, combOp, numPartitions) {
+            createCombiner <- function(v) {
+              do.call(seqOp, list(zeroValue, v))
+            }
+
+            combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
+          })
+
+#' Fold a pair RDD by each key.
+#' 
+#' Aggregate the values of each key in an RDD, using an associative function "func"
+#' and a neutral "zero value" which may be added to the result an arbitrary 
+#' number of times, and must not change the result (e.g., 0 for addition, or 
+#' 1 for multiplication.).
+#' 
+#' @param x An RDD.
+#' @param zeroValue A neutral "zero value".
+#' @param func An associative function for folding values of each key.
+#' @return An RDD containing the aggregation result.
+#' @seealso aggregateByKey, combineByKey
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
+#'}
+#' @rdname foldByKey
+#' @aliases foldByKey,RDD,ANY,ANY,integer-method
+setMethod("foldByKey",
+          signature(x = "RDD", zeroValue = "ANY",
+                    func = "ANY", numPartitions = "integer"),
+          function(x, zeroValue, func, numPartitions) {
+            aggregateByKey(x, zeroValue, func, func, numPartitions)
+          })
+
+############ Binary Functions #############
+
+#' Join two RDDs
+#'
+#' @description
+#' \code{join} This function joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with matching keys in
+#'         two input RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
+#'}
+#' @rdname join-methods
+#' @aliases join,RDD,RDD-method
+setMethod("join",
+          signature(x = "RDD", y = "RDD"),
+          function(x, y, numPartitions) {
+            xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+            yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+            
+            doJoin <- function(v) {
+              joinTaggedList(v, list(FALSE, FALSE))
+            }
+            
+            joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
+                                    doJoin)
+          })
+
+#' Left outer join two RDDs
+#'
+#' @description
+#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x, the resulting RDD will either contain 
+#'         all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) 
+#'         if no elements in rdd2 have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' leftOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+#'}
+#' @rdname join-methods
+#' @aliases leftOuterJoin,RDD,RDD-method
+setMethod("leftOuterJoin",
+          signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+          function(x, y, numPartitions) {
+            xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+            yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+            
+            doJoin <- function(v) {
+              joinTaggedList(v, list(FALSE, TRUE))
+            }
+            
+            joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+          })
+
+#' Right outer join two RDDs
+#'
+#' @description
+#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, w) in y, the resulting RDD will either contain
+#'         all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
+#'         if no elements in x have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rightOuterJoin(rdd1, rdd2, 2L)
+#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+#'}
+#' @rdname join-methods
+#' @aliases rightOuterJoin,RDD,RDD-method
+setMethod("rightOuterJoin",
+          signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+          function(x, y, numPartitions) {
+            xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+            yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+            
+            doJoin <- function(v) {
+              joinTaggedList(v, list(TRUE, FALSE))
+            }
+            
+            joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+          })
+
+#' Full outer join two RDDs
+#'
+#' @description
+#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). 
+#' The key types of the two RDDs should be the same.
+#'
+#' @param x An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param y An RDD to be joined. Should be an RDD where each element is
+#'             list(K, V).
+#' @param numPartitions Number of partitions to create.
+#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
+#'         will contain all pairs (k, (v, w)) for both (k, v) in x and
+#'         (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements 
+#'         in x/y have key k.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
+#'                               #      list(1, list(3, 1)),
+#'                               #      list(2, list(NULL, 4)))
+#'                               #      list(3, list(3, NULL)),
+#'}
+#' @rdname join-methods
+#' @aliases fullOuterJoin,RDD,RDD-method
+setMethod("fullOuterJoin",
+          signature(x = "RDD", y = "RDD", numPartitions = "integer"),
+          function(x, y, numPartitions) {
+            xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
+            yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
+
+            doJoin <- function(v) {
+              joinTaggedList(v, list(TRUE, TRUE))
+            }
+
+            joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
+          })
+
+#' For each key k in several RDDs, return a resulting RDD that
+#' whose values are a list of values for the key in all RDDs.
+#'
+#' @param ... Several RDDs.
+#' @param numPartitions Number of partitions to create.
+#' @return a new RDD containing all pairs of elements with values in a list
+#' in all RDDs.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+#' cogroup(rdd1, rdd2, numPartitions = 2L) 
+#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
+#'}
+#' @rdname cogroup
+#' @aliases cogroup,RDD-method
+setMethod("cogroup",
+          "RDD",
+          function(..., numPartitions) {
+            rdds <- list(...)
+            rddsLen <- length(rdds)
+            for (i in 1:rddsLen) {
+              rdds[[i]] <- lapply(rdds[[i]], 
+                                  function(x) { list(x[[1]], list(i, x[[2]])) })
+              # TODO(hao): As issue [SparkR-142] mentions, the right value of i
+              # will not be captured into UDF if getJRDD is not invoked.
+              # It should be resolved together with that issue.
+              getJRDD(rdds[[i]])  # Capture the closure.
+            }
+            union.rdd <- Reduce(unionRDD, rdds)
+            group.func <- function(vlist) {
+              res <- list()
+              length(res) <- rddsLen
+              for (x in vlist) {
+                i <- x[[1]]
+                acc <- res[[i]]
+                # Create an accumulator.
+                if (is.null(acc)) {
+                  acc <- initAccumulator()
+                }
+                addItemToAccumulator(acc, x[[2]])
+                res[[i]] <- acc
+              }
+              lapply(res, function(acc) {
+                if (is.null(acc)) {
+                  list()
+                } else {
+                  acc$data
+                }
+              })
+            }
+            cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions), 
+                                     group.func)
+          })
+
+#' Sort a (k, v) pair RDD by k.
+#'
+#' @param x A (k, v) pair RDD to be sorted.
+#' @param ascending A flag to indicate whether the sorting is ascending or descending.
+#' @param numPartitions Number of partitions to create.
+#' @return An RDD where all (k, v) pair elements are sorted.
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
+#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
+#'}
+#' @rdname sortByKey
+#' @aliases sortByKey,RDD,RDD-method
+setMethod("sortByKey",
+          signature(x = "RDD"),
+          function(x, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) {
+            rangeBounds <- list()
+            
+            if (numPartitions > 1) {
+              rddSize <- count(x)
+              # constant from Spark's RangePartitioner
+              maxSampleSize <- numPartitions * 20
+              fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
+              
+              samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L)))
+              
+              # Note: the built-in R sort() function only works on atomic vectors
+              samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)
+              
+              if (length(samples) > 0) {
+                rangeBounds <- lapply(seq_len(numPartitions - 1),
+                                      function(i) {
+                                        j <- ceiling(length(samples) * i / numPartitions)
+                                        samples[j]
+                                      })
+              }
+            }
+
+            rangePartitionFunc <- function(key) {
+              partition <- 0
+              
+              # TODO: Use binary search instead of linear search, similar with Spark
+              while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
+                partition <- partition + 1
+              }
+              
+              if (ascending) {
+                partition
+              } else {
+                numPartitions - partition - 1
+              }
+            }
+            
+            partitionFunc <- function(part) {
+              sortKeyValueList(part, decreasing = !ascending)
+            }
+            
+            newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
+            lapplyPartition(newRDD, partitionFunc)
+          })
+          

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/serialize.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R
new file mode 100644
index 0000000..8a9c0c6
--- /dev/null
+++ b/R/pkg/R/serialize.R
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utility functions to serialize R objects so they can be read in Java.
+
+# Type mapping from R to Java
+#
+# NULL -> Void
+# integer -> Int
+# character -> String
+# logical -> Boolean
+# double, numeric -> Double
+# raw -> Array[Byte]
+# Date -> Date
+# POSIXct,POSIXlt -> Time
+#
+# list[T] -> Array[T], where T is one of above mentioned types
+# environment -> Map[String, T], where T is a native type
+# jobj -> Object, where jobj is an object created in the backend
+
+writeObject <- function(con, object, writeType = TRUE) {
+  # NOTE: In R vectors have same type as objects. So we don't support
+  # passing in vectors as arrays and instead require arrays to be passed
+  # as lists.
+  type <- class(object)[[1]]  # class of POSIXlt is c("POSIXlt", "POSIXt")
+  if (writeType) {
+    writeType(con, type)
+  }
+  switch(type,
+         NULL = writeVoid(con),
+         integer = writeInt(con, object),
+         character = writeString(con, object),
+         logical = writeBoolean(con, object),
+         double = writeDouble(con, object),
+         numeric = writeDouble(con, object),
+         raw = writeRaw(con, object),
+         list = writeList(con, object),
+         jobj = writeJobj(con, object),
+         environment = writeEnv(con, object),
+         Date = writeDate(con, object),
+         POSIXlt = writeTime(con, object),
+         POSIXct = writeTime(con, object),
+         stop(paste("Unsupported type for serialization", type)))
+}
+
+writeVoid <- function(con) {
+  # no value for NULL
+}
+
+writeJobj <- function(con, value) {
+  if (!isValidJobj(value)) {
+    stop("invalid jobj ", value$id)
+  }
+  writeString(con, value$id)
+}
+
+writeString <- function(con, value) {
+  writeInt(con, as.integer(nchar(value) + 1))
+  writeBin(value, con, endian = "big")
+}
+
+writeInt <- function(con, value) {
+  writeBin(as.integer(value), con, endian = "big")
+}
+
+writeDouble <- function(con, value) {
+  writeBin(value, con, endian = "big")
+}
+
+writeBoolean <- function(con, value) {
+  # TRUE becomes 1, FALSE becomes 0
+  writeInt(con, as.integer(value))
+}
+
+writeRawSerialize <- function(outputCon, batch) {
+  outputSer <- serialize(batch, ascii = FALSE, connection = NULL)
+  writeRaw(outputCon, outputSer)
+}
+
+writeRowSerialize <- function(outputCon, rows) {
+  invisible(lapply(rows, function(r) {
+    bytes <- serializeRow(r)
+    writeRaw(outputCon, bytes)
+  }))
+}
+
+serializeRow <- function(row) {
+  rawObj <- rawConnection(raw(0), "wb")
+  on.exit(close(rawObj))
+  writeRow(rawObj, row)
+  rawConnectionValue(rawObj)
+}
+
+writeRow <- function(con, row) {
+  numCols <- length(row)
+  writeInt(con, numCols)
+  for (i in 1:numCols) {
+    writeObject(con, row[[i]])
+  }
+}
+
+writeRaw <- function(con, batch) {
+  writeInt(con, length(batch))
+  writeBin(batch, con, endian = "big")
+}
+
+writeType <- function(con, class) {
+  type <- switch(class,
+                 NULL = "n",
+                 integer = "i",
+                 character = "c",
+                 logical = "b",
+                 double = "d",
+                 numeric = "d",
+                 raw = "r",
+                 list = "l",
+                 jobj = "j",
+                 environment = "e",
+                 Date = "D",
+                 POSIXlt = 't',
+                 POSIXct = 't',
+                 stop(paste("Unsupported type for serialization", class)))
+  writeBin(charToRaw(type), con)
+}
+
+# Used to pass arrays where all the elements are of the same type
+writeList <- function(con, arr) {
+  # All elements should be of same type
+  elemType <- unique(sapply(arr, function(elem) { class(elem) }))
+  stopifnot(length(elemType) <= 1)
+
+  # TODO: Empty lists are given type "character" right now.
+  # This may not work if the Java side expects array of any other type.
+  if (length(elemType) == 0) {
+    elemType <- class("somestring")
+  }
+
+  writeType(con, elemType)
+  writeInt(con, length(arr))
+
+  if (length(arr) > 0) {
+    for (a in arr) {
+      writeObject(con, a, FALSE)
+    }
+  }
+}
+
+# Used to pass in hash maps required on Java side.
+writeEnv <- function(con, env) {
+  len <- length(env)
+
+  writeInt(con, len)
+  if (len > 0) {
+    writeList(con, as.list(ls(env)))
+    vals <- lapply(ls(env), function(x) { env[[x]] })
+    writeList(con, as.list(vals))
+  }
+}
+
+writeDate <- function(con, date) {
+  writeString(con, as.character(date))
+}
+
+writeTime <- function(con, time) {
+  writeDouble(con, as.double(time))
+}
+
+# Used to serialize in a list of objects where each
+# object can be of a different type. Serialization format is
+# <object type> <object> for each object
+writeArgs <- function(con, args) {
+  if (length(args) > 0) {
+    for (a in args) {
+      writeObject(con, a)
+    }
+  }
+}
+
+writeStrings <- function(con, stringList) {
+  writeLines(unlist(stringList), con)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
new file mode 100644
index 0000000..bc82df0
--- /dev/null
+++ b/R/pkg/R/sparkR.R
@@ -0,0 +1,266 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.sparkREnv <- new.env()
+
+sparkR.onLoad <- function(libname, pkgname) {
+  .sparkREnv$libname <- libname
+}
+
+# Utility function that returns TRUE if we have an active connection to the
+# backend and FALSE otherwise
+connExists <- function(env) {
+  tryCatch({
+    exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
+  }, error = function(err) {
+    return(FALSE)
+  })
+}
+
+#' Stop the Spark context.
+#'
+#' Also terminates the backend this R session is connected to
+sparkR.stop <- function() {
+  env <- .sparkREnv
+  if (exists(".sparkRCon", envir = env)) {
+    # cat("Stopping SparkR\n")
+    if (exists(".sparkRjsc", envir = env)) {
+      sc <- get(".sparkRjsc", envir = env)
+      callJMethod(sc, "stop")
+      rm(".sparkRjsc", envir = env)
+    }
+  
+    if (exists(".backendLaunched", envir = env)) {
+      callJStatic("SparkRHandler", "stopBackend")
+    }
+
+    # Also close the connection and remove it from our env
+    conn <- get(".sparkRCon", envir = env)
+    close(conn)
+
+    rm(".sparkRCon", envir = env)
+    rm(".scStartTime", envir = env)
+  }
+
+  if (exists(".monitorConn", envir = env)) {
+    conn <- get(".monitorConn", envir = env)
+    close(conn)
+    rm(".monitorConn", envir = env)
+  }
+
+  # Clear all broadcast variables we have
+  # as the jobj will not be valid if we restart the JVM
+  clearBroadcastVariables()
+
+  # Clear jobj maps
+  clearJobjs()
+}
+
+#' Initialize a new Spark Context.
+#'
+#' This function initializes a new SparkContext.
+#'
+#' @param master The Spark master URL.
+#' @param appName Application name to register with cluster manager
+#' @param sparkHome Spark Home directory
+#' @param sparkEnvir Named list of environment variables to set on worker nodes.
+#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
+#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
+#' @param sparkRLibDir The path where R is installed on the worker nodes.
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
+#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
+#'                  list(spark.executor.memory="1g"))
+#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
+#'                  list(spark.executor.memory="1g"),
+#'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
+#'                  c("jarfile1.jar","jarfile2.jar"))
+#'}
+
+sparkR.init <- function(
+  master = "",
+  appName = "SparkR",
+  sparkHome = Sys.getenv("SPARK_HOME"),
+  sparkEnvir = list(),
+  sparkExecutorEnv = list(),
+  sparkJars = "",
+  sparkRLibDir = "") {
+
+  if (exists(".sparkRjsc", envir = .sparkREnv)) {
+    cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
+    return(get(".sparkRjsc", envir = .sparkREnv))
+  }
+
+  sparkMem <- Sys.getenv("SPARK_MEM", "512m")
+  jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
+
+  # Classpath separator is ";" on Windows
+  # URI needs four /// as from http://stackoverflow.com/a/18522792
+  if (.Platform$OS.type == "unix") {
+    collapseChar <- ":"
+    uriSep <- "//"
+  } else {
+    collapseChar <- ";"
+    uriSep <- "////"
+  }
+
+  existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
+  if (existingPort != "") {
+    backendPort <- existingPort
+  } else {
+    path <- tempfile(pattern = "backend_port")
+    launchBackend(
+        args = path,
+        sparkHome = sparkHome,
+        jars = jars,
+        sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
+    # wait atmost 100 seconds for JVM to launch
+    wait <- 0.1
+    for (i in 1:25) {
+      Sys.sleep(wait)
+      if (file.exists(path)) {
+        break
+      }
+      wait <- wait * 1.25
+    }
+    if (!file.exists(path)) {
+      stop("JVM is not ready after 10 seconds")
+    }
+    f <- file(path, open='rb')
+    backendPort <- readInt(f)
+    monitorPort <- readInt(f)
+    close(f)
+    file.remove(path)
+    if (length(backendPort) == 0 || backendPort == 0 ||
+        length(monitorPort) == 0 || monitorPort == 0) {
+      stop("JVM failed to launch")
+    }
+    assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
+    assign(".backendLaunched", 1, envir = .sparkREnv)
+  }
+
+  .sparkREnv$backendPort <- backendPort
+  tryCatch({
+    connectBackend("localhost", backendPort)
+  }, error = function(err) {
+    stop("Failed to connect JVM\n")
+  })
+
+  if (nchar(sparkHome) != 0) {
+    sparkHome <- normalizePath(sparkHome)
+  }
+
+  if (nchar(sparkRLibDir) != 0) {
+    .sparkREnv$libname <- sparkRLibDir
+  }
+
+  sparkEnvirMap <- new.env()
+  for (varname in names(sparkEnvir)) {
+    sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
+  }
+  
+  sparkExecutorEnvMap <- new.env()
+  if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
+    sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
+  }
+  for (varname in names(sparkExecutorEnv)) {
+    sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
+  }
+
+  nonEmptyJars <- Filter(function(x) { x != "" }, jars)
+  localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
+
+  # Set the start time to identify jobjs
+  # Seconds resolution is good enough for this purpose, so use ints
+  assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
+
+  assign(
+    ".sparkRjsc",
+    callJStatic(
+      "org.apache.spark.api.r.RRDD",
+      "createSparkContext",
+      master,
+      appName,
+      as.character(sparkHome),
+      as.list(localJarPaths),
+      sparkEnvirMap,
+      sparkExecutorEnvMap),
+    envir = .sparkREnv
+  )
+
+  sc <- get(".sparkRjsc", envir = .sparkREnv)
+
+  # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
+  reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
+
+  sc
+}
+
+#' Initialize a new SQLContext.
+#'
+#' This function creates a SparkContext from an existing JavaSparkContext and 
+#' then uses it to initialize a new SQLContext
+#'
+#' @param jsc The existing JavaSparkContext created with SparkR.init()
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRSQL.init(sc)
+#'}
+
+sparkRSQL.init <- function(jsc) {
+  if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
+    return(get(".sparkRSQLsc", envir = .sparkREnv))
+  }
+
+  sqlCtx <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+                        "createSQLContext",
+                        jsc)
+  assign(".sparkRSQLsc", sqlCtx, envir = .sparkREnv)
+  sqlCtx
+}
+
+#' Initialize a new HiveContext.
+#'
+#' This function creates a HiveContext from an existing JavaSparkContext
+#'
+#' @param jsc The existing JavaSparkContext created with SparkR.init()
+#' @export
+#' @examples
+#'\dontrun{
+#' sc <- sparkR.init()
+#' sqlCtx <- sparkRHive.init(sc)
+#'}
+
+sparkRHive.init <- function(jsc) {
+  if (exists(".sparkRHivesc", envir = .sparkREnv)) {
+    return(get(".sparkRHivesc", envir = .sparkREnv))
+  }
+
+  ssc <- callJMethod(jsc, "sc")
+  hiveCtx <- tryCatch({
+    newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
+  }, error = function(err) {
+    stop("Spark SQL is not built with Hive support")
+  })
+
+  assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
+  hiveCtx
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org