You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/04/09 07:45:59 UTC

[3/7] spark git commit: [SPARK-5654] Integrate SparkR

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
new file mode 100644
index 0000000..c337fb0
--- /dev/null
+++ b/R/pkg/R/utils.R
@@ -0,0 +1,467 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Utilities and Helpers
+
+# Given a JList<T>, returns an R list containing the same elements, the number
+# of which is optionally upper bounded by `logicalUpperBound` (by default,
+# return all elements).  Takes care of deserializations and type conversions.
+convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
+  serializedMode = "byte") {
+  arrSize <- callJMethod(jList, "size")
+
+  # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
+  # each partition is not dense-packed into one Array[Byte], and `arrSize`
+  # here corresponds to number of logical elements. Thus we can prune here.
+  if (serializedMode == "string" && !is.null(logicalUpperBound)) {
+    arrSize <- min(arrSize, logicalUpperBound)
+  }
+
+  results <- if (arrSize > 0) {
+    lapply(0:(arrSize - 1),
+          function(index) {
+            obj <- callJMethod(jList, "get", as.integer(index))
+
+            # Assume it is either an R object or a Java obj ref.
+            if (inherits(obj, "jobj")) {
+              if (isInstanceOf(obj, "scala.Tuple2")) {
+                # JavaPairRDD[Array[Byte], Array[Byte]].
+
+                keyBytes = callJMethod(obj, "_1")
+                valBytes = callJMethod(obj, "_2")
+                res <- list(unserialize(keyBytes),
+                  unserialize(valBytes))
+              } else {
+                stop(paste("utils.R: convertJListToRList only supports",
+                  "RDD[Array[Byte]] and",
+                  "JavaPairRDD[Array[Byte], Array[Byte]] for now"))
+              }
+            } else {
+              if (inherits(obj, "raw")) {
+                if (serializedMode == "byte") {
+                  # RDD[Array[Byte]]. `obj` is a whole partition.
+                  res <- unserialize(obj)
+                  # For serialized datasets, `obj` (and `rRaw`) here corresponds to
+                  # one whole partition dense-packed together. We deserialize the
+                  # whole partition first, then cap the number of elements to be returned.
+                } else if (serializedMode == "row") {
+                  res <- readRowList(obj)
+                  # For DataFrames that have been converted to RRDDs, we call readRowList
+                  # which will read in each row of the RRDD as a list and deserialize
+                  # each element.
+                  flatten <<- FALSE
+                  # Use global assignment to change the flatten flag. This means
+                  # we don't have to worry about the default argument in other functions
+                  # e.g. collect
+                }
+                # TODO: is it possible to distinguish element boundary so that we can
+                # unserialize only what we need?
+                if (!is.null(logicalUpperBound)) {
+                  res <- head(res, n = logicalUpperBound)
+                }
+              } else {
+                # obj is of a primitive Java type, is simplified to R's
+                # corresponding type.
+                res <- list(obj)
+              }
+            }
+            res
+          })
+  } else {
+    list()
+  }
+
+  if (flatten) {
+    as.list(unlist(results, recursive = FALSE))
+  } else {
+    as.list(results)
+  }
+}
+
+# Returns TRUE if `name` refers to an RDD in the given environment `env`
+isRDD <- function(name, env) {
+  obj <- get(name, envir = env)
+  inherits(obj, "RDD")
+}
+
+#' Compute the hashCode of an object
+#'
+#' Java-style function to compute the hashCode for the given object. Returns
+#' an integer value.
+#'
+#' @details
+#' This only works for integer, numeric and character types right now.
+#'
+#' @param key the object to be hashed
+#' @return the hash code as an integer
+#' @export
+#' @examples
+#' hashCode(1L) # 1
+#' hashCode(1.0) # 1072693248
+#' hashCode("1") # 49
+hashCode <- function(key) {
+  if (class(key) == "integer") {
+    as.integer(key[[1]])
+  } else if (class(key) == "numeric") {
+    # Convert the double to long and then calculate the hash code
+    rawVec <- writeBin(key[[1]], con = raw())
+    intBits <- packBits(rawToBits(rawVec), "integer")
+    as.integer(bitwXor(intBits[2], intBits[1]))
+  } else if (class(key) == "character") {
+    .Call("stringHashCode", key)
+  } else {
+    warning(paste("Could not hash object, returning 0", sep = ""))
+    as.integer(0)
+  }
+}
+
+# Create a new RDD with serializedMode == "byte".
+# Return itself if already in "byte" format.
+serializeToBytes <- function(rdd) {
+  if (!inherits(rdd, "RDD")) {
+    stop("Argument 'rdd' is not an RDD type.")
+  }
+  if (getSerializedMode(rdd) != "byte") {
+    ser.rdd <- lapply(rdd, function(x) { x })
+    return(ser.rdd)
+  } else {
+    return(rdd)
+  }
+}
+
+# Create a new RDD with serializedMode == "string".
+# Return itself if already in "string" format.
+serializeToString <- function(rdd) {
+  if (!inherits(rdd, "RDD")) {
+    stop("Argument 'rdd' is not an RDD type.")
+  }
+  if (getSerializedMode(rdd) != "string") {
+    ser.rdd <- lapply(rdd, function(x) { toString(x) })
+    # force it to create jrdd using "string"
+    getJRDD(ser.rdd, serializedMode = "string")
+    return(ser.rdd)
+  } else {
+    return(rdd)
+  }
+}
+
+# Fast append to list by using an accumulator.
+# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
+#
+# The accumulator should has three fields size, counter and data.
+# This function amortizes the allocation cost by doubling
+# the size of the list every time it fills up.
+addItemToAccumulator <- function(acc, item) {
+  if(acc$counter == acc$size) {
+    acc$size <- acc$size * 2
+    length(acc$data) <- acc$size
+  }
+  acc$counter <- acc$counter + 1
+  acc$data[[acc$counter]] <- item
+}
+
+initAccumulator <- function() {
+  acc <- new.env()
+  acc$counter <- 0
+  acc$data <- list(NULL)
+  acc$size <- 1
+  acc
+}
+
+# Utility function to sort a list of key value pairs
+# Used in unit tests
+sortKeyValueList <- function(kv_list, decreasing = FALSE) {
+  keys <- sapply(kv_list, function(x) x[[1]])
+  kv_list[order(keys, decreasing = decreasing)]
+}
+
+# Utility function to generate compact R lists from grouped rdd
+# Used in Join-family functions
+# param:
+#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
+#   cnull Boolean list where each element determines whether the corresponding list should
+#         be converted to list(NULL)
+genCompactLists <- function(tagged_list, cnull) {
+  len <- length(tagged_list)
+  lists <- list(vector("list", len), vector("list", len))
+  index <- list(1, 1)
+
+  for (x in tagged_list) {
+    tag <- x[[1]]
+    idx <- index[[tag]]
+    lists[[tag]][[idx]] <- x[[2]]
+    index[[tag]] <- idx + 1
+  }
+
+  len <- lapply(index, function(x) x - 1)
+  for (i in (1:2)) {
+    if (cnull[[i]] && len[[i]] == 0) {
+      lists[[i]] <- list(NULL)
+    } else {
+      length(lists[[i]]) <- len[[i]]
+    }
+  }
+
+  lists
+}
+
+# Utility function to merge compact R lists
+# Used in Join-family functions
+# param:
+#   left/right Two compact lists ready for Cartesian product
+mergeCompactLists <- function(left, right) {
+  result <- list()
+  length(result) <- length(left) * length(right)
+  index <- 1
+  for (i in left) {
+    for (j in right) {
+      result[[index]] <- list(i, j)
+      index <- index + 1
+    }
+  }
+  result
+}
+
+# Utility function to wrapper above two operations
+# Used in Join-family functions
+# param (same as genCompactLists):
+#   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
+#   cnull Boolean list where each element determines whether the corresponding list should
+#         be converted to list(NULL)
+joinTaggedList <- function(tagged_list, cnull) {
+  lists <- genCompactLists(tagged_list, cnull)
+  mergeCompactLists(lists[[1]], lists[[2]])
+}
+
+# Utility function to reduce a key-value list with predicate
+# Used in *ByKey functions
+# param
+#   pair key-value pair
+#   keys/vals env of key/value with hashes
+#   updateOrCreatePred predicate function
+#   updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
+#   createFn create function for new pair, similar with `createCombiner` @combinebykey
+updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
+  # assume hashVal bind to `$hash`, key/val with index 1/2
+  hashVal <- pair$hash
+  key <- pair[[1]]
+  val <- pair[[2]]
+  if (updateOrCreatePred(pair)) {
+    assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
+  } else {
+    assign(hashVal, do.call(createFn, list(val)), envir = vals)
+    assign(hashVal, key, envir = keys)
+  }
+}
+
+# Utility function to convert key&values envs into key-val list
+convertEnvsToList <- function(keys, vals) {
+  lapply(ls(keys),
+         function(name) {
+           list(keys[[name]], vals[[name]])
+         })
+}
+
+# Utility function to capture the varargs into environment object
+varargsToEnv <- function(...) {
+  pairs <- as.list(substitute(list(...)))[-1L]
+  env <- new.env()
+  for (name in names(pairs)) {
+    env[[name]] <- pairs[[name]]
+  }
+  env
+}
+
+getStorageLevel <- function(newLevel = c("DISK_ONLY",
+                                         "DISK_ONLY_2",
+                                         "MEMORY_AND_DISK",
+                                         "MEMORY_AND_DISK_2",
+                                         "MEMORY_AND_DISK_SER",
+                                         "MEMORY_AND_DISK_SER_2",
+                                         "MEMORY_ONLY",
+                                         "MEMORY_ONLY_2",
+                                         "MEMORY_ONLY_SER",
+                                         "MEMORY_ONLY_SER_2",
+                                         "OFF_HEAP")) {
+  match.arg(newLevel)
+  storageLevel <- switch(newLevel,
+                         "DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
+                         "DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
+                         "MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
+                         "MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
+                         "MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
+                         "MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
+                         "MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
+                         "MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
+                         "MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
+                         "MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
+                         "OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
+}
+
+# Utility function for functions where an argument needs to be integer but we want to allow
+# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
+numToInt <- function(num) {
+  if (as.integer(num) != num) {
+    warning(paste("Coercing", as.list(sys.call())[[2]], "to integer."))
+  }
+  as.integer(num)
+}
+
+# create a Seq in JVM
+toSeq <- function(...) {
+  callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", list(...))
+}
+
+# create a Seq in JVM from a list
+listToSeq <- function(l) {
+  callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", l)
+}
+
+# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
+# user defined function (UDF), and to examine variables in the UDF to decide 
+# if their values should be included in the new function environment.
+# param
+#   node The current AST node in the traversal.
+#   oldEnv The original function environment.
+#   defVars An Accumulator of variables names defined in the function's calling environment,
+#           including function argument and local variable names.
+#   checkedFunc An environment of function objects examined during cleanClosure. It can 
+#               be considered as a "name"-to-"list of functions" mapping.
+#   newEnv A new function environment to store necessary function dependencies, an output argument.
+processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
+  nodeLen <- length(node)
+  
+  if (nodeLen > 1 && typeof(node) == "language") {
+    # Recursive case: current AST node is an internal node, check for its children. 
+    if (length(node[[1]]) > 1) {
+      for (i in 1:nodeLen) {
+        processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
+      }
+    } else {  # if node[[1]] is length of 1, check for some R special functions.
+      nodeChar <- as.character(node[[1]])
+      if (nodeChar == "{" || nodeChar == "(") {  # Skip start symbol.
+        for (i in 2:nodeLen) {
+          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
+        }
+      } else if (nodeChar == "<-" || nodeChar == "=" || 
+                   nodeChar == "<<-") { # Assignment Ops.
+        defVar <- node[[2]]
+        if (length(defVar) == 1 && typeof(defVar) == "symbol") {
+          # Add the defined variable name into defVars.
+          addItemToAccumulator(defVars, as.character(defVar))
+        } else {
+          processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
+        }
+        for (i in 3:nodeLen) {
+          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
+        }
+      } else if (nodeChar == "function") {  # Function definition.
+        # Add parameter names.
+        newArgs <- names(node[[2]])
+        lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
+        for (i in 3:nodeLen) {
+          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
+        }
+      } else if (nodeChar == "$") {  # Skip the field.
+        processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
+      } else if (nodeChar == "::" || nodeChar == ":::") {
+        processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
+      } else {
+        for (i in 1:nodeLen) {
+          processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
+        }
+      }
+    }
+  } else if (nodeLen == 1 && 
+               (typeof(node) == "symbol" || typeof(node) == "language")) {
+    # Base case: current AST node is a leaf node and a symbol or a function call.
+    nodeChar <- as.character(node)
+    if (!nodeChar %in% defVars$data) {  # Not a function parameter or local variable.
+      func.env <- oldEnv
+      topEnv <- parent.env(.GlobalEnv)
+      # Search in function environment, and function's enclosing environments 
+      # up to global environment. There is no need to look into package environments
+      # above the global or namespace environment that is not SparkR below the global, 
+      # as they are assumed to be loaded on workers.
+      while (!identical(func.env, topEnv)) {
+        # Namespaces other than "SparkR" will not be searched.
+        if (!isNamespace(func.env) || 
+              (getNamespaceName(func.env) == "SparkR" && 
+              !(nodeChar %in% getNamespaceExports("SparkR")))) {  # Only include SparkR internals.
+          # Set parameter 'inherits' to FALSE since we do not need to search in
+          # attached package environments.
+          if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
+                       error = function(e) { FALSE })) {
+            obj <- get(nodeChar, envir = func.env, inherits = FALSE)
+            if (is.function(obj)) {  # If the node is a function call.
+              funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F, 
+                               ifnotfound = list(list(NULL)))[[1]]
+              found <- sapply(funcList, function(func) {
+                ifelse(identical(func, obj), TRUE, FALSE)
+              })
+              if (sum(found) > 0) {  # If function has been examined, ignore.
+                break
+              }
+              # Function has not been examined, record it and recursively clean its closure.
+              assign(nodeChar, 
+                     if (is.null(funcList[[1]])) {
+                       list(obj)
+                     } else {
+                       append(funcList, obj)
+                     },
+                     envir = checkedFuncs)
+              obj <- cleanClosure(obj, checkedFuncs)
+            }
+            assign(nodeChar, obj, envir = newEnv)
+            break
+          }
+        }
+        
+        # Continue to search in enclosure.
+        func.env <- parent.env(func.env)
+      }
+    }
+  }
+}
+
+# Utility function to get user defined function (UDF) dependencies (closure). 
+# More specifically, this function captures the values of free variables defined 
+# outside a UDF, and stores them in the function's environment.
+# param
+#   func A function whose closure needs to be captured.
+#   checkedFunc An environment of function objects examined during cleanClosure. It can be
+#               considered as a "name"-to-"list of functions" mapping.
+# return value
+#   a new version of func that has an correct environment (closure).
+cleanClosure <- function(func, checkedFuncs = new.env()) {
+  if (is.function(func)) {
+    newEnv <- new.env(parent = .GlobalEnv)
+    func.body <- body(func)
+    oldEnv <- environment(func)
+    # defVars is an Accumulator of variables names defined in the function's calling 
+    # environment. First, function's arguments are added to defVars.
+    defVars <- initAccumulator()
+    argNames <- names(as.list(args(func)))
+    for (i in 1:(length(argNames) - 1)) {  # Remove the ending NULL in pairlist.
+      addItemToAccumulator(defVars, argNames[i])
+    }
+    # Recursively examine variables in the function body.
+    processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
+    environment(func) <- newEnv
+  }
+  func
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/R/zzz.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/zzz.R b/R/pkg/R/zzz.R
new file mode 100644
index 0000000..80d796d
--- /dev/null
+++ b/R/pkg/R/zzz.R
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.onLoad <- function(libname, pkgname) {
+  sparkR.onLoad(libname, pkgname)
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/profile/general.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R
new file mode 100644
index 0000000..8fe711b
--- /dev/null
+++ b/R/pkg/inst/profile/general.R
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.First <- function() {
+  home <- Sys.getenv("SPARK_HOME")
+  .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+  Sys.setenv(NOAWT=1)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/profile/shell.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/profile/shell.R b/R/pkg/inst/profile/shell.R
new file mode 100644
index 0000000..7a7f203
--- /dev/null
+++ b/R/pkg/inst/profile/shell.R
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+.First <- function() {
+  home <- Sys.getenv("SPARK_HOME")
+  .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
+  Sys.setenv(NOAWT=1)
+
+  library(utils)
+  library(SparkR)
+  sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
+  assign("sc", sc, envir=.GlobalEnv)
+  sqlCtx <- sparkRSQL.init(sc)
+  assign("sqlCtx", sqlCtx, envir=.GlobalEnv)
+  cat("\n Welcome to SparkR!")
+  cat("\n Spark context is available as sc, SQL context is available as sqlCtx\n")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_binaryFile.R b/R/pkg/inst/tests/test_binaryFile.R
new file mode 100644
index 0000000..4bb5f58
--- /dev/null
+++ b/R/pkg/inst/tests/test_binaryFile.R
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions on binary files")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+mockFile = c("Spark is pretty.", "Spark is awesome.")
+
+test_that("saveAsObjectFile()/objectFile() following textFile() works", {
+  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1)
+  saveAsObjectFile(rdd, fileName2)
+  rdd <- objectFile(sc, fileName2)
+  expect_equal(collect(rdd), as.list(mockFile))
+
+  unlink(fileName1)
+  unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
+  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+
+  l <- list(1, 2, 3)
+  rdd <- parallelize(sc, l)
+  saveAsObjectFile(rdd, fileName)
+  rdd <- objectFile(sc, fileName)
+  expect_equal(collect(rdd), l)
+
+  unlink(fileName, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
+  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1)
+
+  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+  wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+  counts <- reduceByKey(wordCount, "+", 2L)
+  
+  saveAsObjectFile(counts, fileName2)
+  counts <- objectFile(sc, fileName2)
+    
+  output <- collect(counts)
+  expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
+                    list("is", 2))
+  expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+  
+  unlink(fileName1)
+  unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
+  fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+  fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+
+  rdd1 <- parallelize(sc, "Spark is pretty.")
+  saveAsObjectFile(rdd1, fileName1)
+  rdd2 <- parallelize(sc, "Spark is awesome.")
+  saveAsObjectFile(rdd2, fileName2)
+
+  rdd <- objectFile(sc, c(fileName1, fileName2))
+  expect_true(count(rdd) == 2)
+
+  unlink(fileName1, recursive = TRUE)
+  unlink(fileName2, recursive = TRUE)
+})
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_binary_function.R b/R/pkg/inst/tests/test_binary_function.R
new file mode 100644
index 0000000..c15553b
--- /dev/null
+++ b/R/pkg/inst/tests/test_binary_function.R
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("binary functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+# File content
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("union on two RDDs", {
+  actual <- collect(unionRDD(rdd, rdd))
+  expect_equal(actual, as.list(rep(nums, 2)))
+  
+  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+  writeLines(mockFile, fileName)
+
+  text.rdd <- textFile(sc, fileName)
+  union.rdd <- unionRDD(rdd, text.rdd)
+  actual <- collect(union.rdd)
+  expect_equal(actual, c(as.list(nums), mockFile))
+  expect_true(getSerializedMode(union.rdd) == "byte")
+
+  rdd<- map(text.rdd, function(x) {x})
+  union.rdd <- unionRDD(rdd, text.rdd)
+  actual <- collect(union.rdd)
+  expect_equal(actual, as.list(c(mockFile, mockFile)))
+  expect_true(getSerializedMode(union.rdd) == "byte")
+
+  unlink(fileName)
+})
+
+test_that("cogroup on two RDDs", {
+  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+  rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) 
+  actual <- collect(cogroup.rdd)
+  expect_equal(actual, 
+               list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
+  
+  rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
+  rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
+  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) 
+  actual <- collect(cogroup.rdd)
+
+  expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_broadcast.R b/R/pkg/inst/tests/test_broadcast.R
new file mode 100644
index 0000000..fee91a4
--- /dev/null
+++ b/R/pkg/inst/tests/test_broadcast.R
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("broadcast variables")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rrdd <- parallelize(sc, nums, 2L)
+
+test_that("using broadcast variable", {
+  randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+  randomMatBr <- broadcast(sc, randomMat)
+
+  useBroadcast <- function(x) {
+    sum(value(randomMatBr) * x)
+  }
+  actual <- collect(lapply(rrdd, useBroadcast))
+  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+  expect_equal(actual, expected)
+})
+
+test_that("without using broadcast variable", {
+  randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+
+  useBroadcast <- function(x) {
+    sum(randomMat * x)
+  }
+  actual <- collect(lapply(rrdd, useBroadcast))
+  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+  expect_equal(actual, expected)
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_context.R b/R/pkg/inst/tests/test_context.R
new file mode 100644
index 0000000..e4aab37
--- /dev/null
+++ b/R/pkg/inst/tests/test_context.R
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("test functions in sparkR.R")
+
+test_that("repeatedly starting and stopping SparkR", {
+  for (i in 1:4) {
+    sc <- sparkR.init()
+    rdd <- parallelize(sc, 1:20, 2L)
+    expect_equal(count(rdd), 20)
+    sparkR.stop()
+  }
+})
+
+test_that("rdd GC across sparkR.stop", {
+  sparkR.stop()
+  sc <- sparkR.init() # sc should get id 0
+  rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
+  rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
+  sparkR.stop()
+
+  sc <- sparkR.init() # sc should get id 0 again
+
+  # GC rdd1 before creating rdd3 and rdd2 after
+  rm(rdd1)
+  gc()
+
+  rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
+  rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
+
+  rm(rdd2)
+  gc()
+
+  count(rdd3)
+  count(rdd4)
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_includePackage.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_includePackage.R b/R/pkg/inst/tests/test_includePackage.R
new file mode 100644
index 0000000..8152b44
--- /dev/null
+++ b/R/pkg/inst/tests/test_includePackage.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("include R packages")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rdd <- parallelize(sc, nums, 2L)
+
+test_that("include inside function", {
+  # Only run the test if plyr is installed.
+  if ("plyr" %in% rownames(installed.packages())) {
+    suppressPackageStartupMessages(library(plyr))
+    generateData <- function(x) {
+      suppressPackageStartupMessages(library(plyr))
+      attach(airquality)
+      result <- transform(Ozone, logOzone = log(Ozone))
+      result
+    }
+
+    data <- lapplyPartition(rdd, generateData)
+    actual <- collect(data)
+  }
+})
+
+test_that("use include package", {
+  # Only run the test if plyr is installed.
+  if ("plyr" %in% rownames(installed.packages())) {
+    suppressPackageStartupMessages(library(plyr))
+    generateData <- function(x) {
+      attach(airquality)
+      result <- transform(Ozone, logOzone = log(Ozone))
+      result
+    }
+
+    includePackage(sc, plyr)
+    data <- lapplyPartition(rdd, generateData)
+    actual <- collect(data)
+  }
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_parallelize_collect.R b/R/pkg/inst/tests/test_parallelize_collect.R
new file mode 100644
index 0000000..fff0286
--- /dev/null
+++ b/R/pkg/inst/tests/test_parallelize_collect.R
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("parallelize() and collect()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+               "violated, but I'm not. No, in fact, I think this is a friendly",
+               "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+               "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+                "other times it helps me control the chaos.",
+                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+                "raising me. But they're both dead now. I didn't kill them. Honest.")
+
+numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
+strPairs <- list(list(strList, strList), list(strList, strList))
+
+# JavaSparkContext handle
+jsc <- sparkR.init()
+
+# Tests
+
+test_that("parallelize() on simple vectors and lists returns an RDD", {
+  numVectorRDD <- parallelize(jsc, numVector, 1)
+  numVectorRDD2 <- parallelize(jsc, numVector, 10)
+  numListRDD <- parallelize(jsc, numList, 1)
+  numListRDD2 <- parallelize(jsc, numList, 4)
+  strVectorRDD <- parallelize(jsc, strVector, 2)
+  strVectorRDD2 <- parallelize(jsc, strVector, 3)
+  strListRDD <- parallelize(jsc, strList, 4)
+  strListRDD2 <- parallelize(jsc, strList, 1)
+
+  rdds <- c(numVectorRDD,
+             numVectorRDD2,
+             numListRDD,
+             numListRDD2,
+             strVectorRDD,
+             strVectorRDD2,
+             strListRDD,
+             strListRDD2)
+
+  for (rdd in rdds) {
+    expect_true(inherits(rdd, "RDD"))
+    expect_true(.hasSlot(rdd, "jrdd")
+                && inherits(rdd@jrdd, "jobj")
+                && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
+  }
+})
+
+test_that("collect(), following a parallelize(), gives back the original collections", {
+  numVectorRDD <- parallelize(jsc, numVector, 10)
+  expect_equal(collect(numVectorRDD), as.list(numVector))
+
+  numListRDD <- parallelize(jsc, numList, 1)
+  numListRDD2 <- parallelize(jsc, numList, 4)
+  expect_equal(collect(numListRDD), as.list(numList))
+  expect_equal(collect(numListRDD2), as.list(numList))
+
+  strVectorRDD <- parallelize(jsc, strVector, 2)
+  strVectorRDD2 <- parallelize(jsc, strVector, 3)
+  expect_equal(collect(strVectorRDD), as.list(strVector))
+  expect_equal(collect(strVectorRDD2), as.list(strVector))
+
+  strListRDD <- parallelize(jsc, strList, 4)
+  strListRDD2 <- parallelize(jsc, strList, 1)
+  expect_equal(collect(strListRDD), as.list(strList))
+  expect_equal(collect(strListRDD2), as.list(strList))
+})
+
+test_that("regression: collect() following a parallelize() does not drop elements", {
+  # 10 %/% 6 = 1, ceiling(10 / 6) = 2
+  collLen <- 10
+  numPart <- 6
+  expected <- runif(collLen)
+  actual <- collect(parallelize(jsc, expected, numPart))
+  expect_equal(actual, as.list(expected))
+})
+
+test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
+  # use the pairwise logical to indicate pairwise data
+  numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
+  numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
+  numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
+  expect_equal(collect(numPairsRDDD1), numPairs)
+  expect_equal(collect(numPairsRDDD2), numPairs)
+  expect_equal(collect(numPairsRDDD3), numPairs)
+  # can also leave out the parameter name, if the params are supplied in order
+  strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
+  strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
+  expect_equal(collect(strPairsRDDD1), strPairs)
+  expect_equal(collect(strPairsRDDD2), strPairs)
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R
new file mode 100644
index 0000000..f75e081
--- /dev/null
+++ b/R/pkg/inst/tests/test_rdd.R
@@ -0,0 +1,644 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("basic RDD functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+test_that("get number of partitions in RDD", {
+  expect_equal(numPartitions(rdd), 2)
+  expect_equal(numPartitions(intRdd), 2)
+})
+
+test_that("first on RDD", {
+  expect_true(first(rdd) == 1)
+  newrdd <- lapply(rdd, function(x) x + 1)
+  expect_true(first(newrdd) == 2)  
+})
+
+test_that("count and length on RDD", {
+   expect_equal(count(rdd), 10)
+   expect_equal(length(rdd), 10)
+})
+
+test_that("count by values and keys", {
+  mods <- lapply(rdd, function(x) { x %% 3 })
+  actual <- countByValue(mods)
+  expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+  
+  actual <- countByKey(intRdd)
+  expected <- list(list(2L, 2L), list(1L, 2L))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("lapply on RDD", {
+  multiples <- lapply(rdd, function(x) { 2 * x })
+  actual <- collect(multiples)
+  expect_equal(actual, as.list(nums * 2))
+})
+
+test_that("lapplyPartition on RDD", {
+  sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
+  actual <- collect(sums)
+  expect_equal(actual, list(15, 40))
+})
+
+test_that("mapPartitions on RDD", {
+  sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
+  actual <- collect(sums)
+  expect_equal(actual, list(15, 40))
+})
+
+test_that("flatMap() on RDDs", {
+  flat <- flatMap(intRdd, function(x) { list(x, x) })
+  actual <- collect(flat)
+  expect_equal(actual, rep(intPairs, each=2))
+})
+
+test_that("filterRDD on RDD", {
+  filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
+  actual <- collect(filtered.rdd)
+  expect_equal(actual, list(2, 4, 6, 8, 10))
+  
+  filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
+  actual <- collect(filtered.rdd)
+  expect_equal(actual, list(list(1L, -1)))
+  
+  # Filter out all elements.
+  filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
+  actual <- collect(filtered.rdd)
+  expect_equal(actual, list())
+})
+
+test_that("lookup on RDD", {
+  vals <- lookup(intRdd, 1L)
+  expect_equal(vals, list(-1, 200))
+  
+  vals <- lookup(intRdd, 3L)
+  expect_equal(vals, list())
+})
+
+test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
+  rdd2 <- rdd
+  for (i in 1:12)
+    rdd2 <- lapplyPartitionsWithIndex(
+              rdd2, function(split, part) {
+                part <- as.list(unlist(part) * split + i)
+              })
+  rdd2 <- lapply(rdd2, function(x) x + x)
+  actual <- collect(rdd2)
+  expected <- list(24, 24, 24, 24, 24, 
+                   168, 170, 172, 174, 176)
+  expect_equal(actual, expected)
+})
+
+test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
+  # RDD
+  rdd2 <- rdd
+  # PipelinedRDD
+  rdd2 <- lapplyPartitionsWithIndex(
+            rdd2,
+            function(split, part) {
+              part <- as.list(unlist(part) * split)
+            })
+
+  cache(rdd2)
+  expect_true(rdd2@env$isCached)
+  rdd2 <- lapply(rdd2, function(x) x)
+  expect_false(rdd2@env$isCached)
+
+  unpersist(rdd2)
+  expect_false(rdd2@env$isCached)
+
+  persist(rdd2, "MEMORY_AND_DISK")
+  expect_true(rdd2@env$isCached)
+  rdd2 <- lapply(rdd2, function(x) x)
+  expect_false(rdd2@env$isCached)
+
+  unpersist(rdd2)
+  expect_false(rdd2@env$isCached)
+
+  setCheckpointDir(sc, "checkpoints")
+  checkpoint(rdd2)
+  expect_true(rdd2@env$isCheckpointed)
+
+  rdd2 <- lapply(rdd2, function(x) x)
+  expect_false(rdd2@env$isCached)
+  expect_false(rdd2@env$isCheckpointed)
+
+  # make sure the data is collectable
+  collect(rdd2)
+
+  unlink("checkpoints")
+})
+
+test_that("reduce on RDD", {
+  sum <- reduce(rdd, "+")
+  expect_equal(sum, 55)
+
+  # Also test with an inline function
+  sumInline <- reduce(rdd, function(x, y) { x + y })
+  expect_equal(sumInline, 55)
+})
+
+test_that("lapply with dependency", {
+  fa <- 5
+  multiples <- lapply(rdd, function(x) { fa * x })
+  actual <- collect(multiples)
+
+  expect_equal(actual, as.list(nums * 5))
+})
+
+test_that("lapplyPartitionsWithIndex on RDDs", {
+  func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) }
+  actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+  expect_equal(actual, list(list(0, 15), list(1, 40)))
+
+  pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
+  partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
+  mkTup <- function(splitIndex, part) { list(splitIndex, part) }
+  actual <- collect(lapplyPartitionsWithIndex(
+                      partitionBy(pairsRDD, 2L, partitionByParity),
+                      mkTup),
+                    FALSE)
+  expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
+                            list(1, list(list(4, 8)))))
+})
+
+test_that("sampleRDD() on RDDs", {
+  expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+})
+
+test_that("takeSample() on RDDs", {
+  # ported from RDDSuite.scala, modified seeds
+  data <- parallelize(sc, 1:100, 2L)
+  for (seed in 4:5) {
+    s <- takeSample(data, FALSE, 20L, seed)
+    expect_equal(length(s), 20L)
+    expect_equal(length(unique(s)), 20L)
+    for (elem in s) {
+      expect_true(elem >= 1 && elem <= 100)
+    }
+  }
+  for (seed in 4:5) {
+    s <- takeSample(data, FALSE, 200L, seed)
+    expect_equal(length(s), 100L)
+    expect_equal(length(unique(s)), 100L)
+    for (elem in s) {
+      expect_true(elem >= 1 && elem <= 100)
+    }
+  }
+  for (seed in 4:5) {
+    s <- takeSample(data, TRUE, 20L, seed)
+    expect_equal(length(s), 20L)
+    for (elem in s) {
+      expect_true(elem >= 1 && elem <= 100)
+    }
+  }
+  for (seed in 4:5) {
+    s <- takeSample(data, TRUE, 100L, seed)
+    expect_equal(length(s), 100L)
+    # Chance of getting all distinct elements is astronomically low, so test we
+    # got < 100
+    expect_true(length(unique(s)) < 100L)
+  }
+  for (seed in 4:5) {
+    s <- takeSample(data, TRUE, 200L, seed)
+    expect_equal(length(s), 200L)
+    # Chance of getting all distinct elements is still quite low, so test we
+    # got < 100
+    expect_true(length(unique(s)) < 100L)
+  }
+})
+
+test_that("mapValues() on pairwise RDDs", {
+  multiples <- mapValues(intRdd, function(x) { x * 2 })
+  actual <- collect(multiples)
+  expected <- lapply(intPairs, function(x) {
+    list(x[[1]], x[[2]] * 2)
+  })
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("flatMapValues() on pairwise RDDs", {
+  l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+  actual <- collect(flatMapValues(l, function(x) { x }))
+  expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))
+  
+  # Generate x to x+1 for every value
+  actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) }))
+  expect_equal(actual, 
+               list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
+                    list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
+})
+
+test_that("reduceByKeyLocally() on PairwiseRDDs", {
+  pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
+  actual <- reduceByKeyLocally(pairs, "+")
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list(1, 6), list(1.1, 3))))
+
+  pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
+                                list("bb", 5)), 4L)
+  actual <- reduceByKeyLocally(pairs, "+")
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
+})
+
+test_that("distinct() on RDDs", {
+  nums.rep2 <- rep(1:10, 2)
+  rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
+  uniques <- distinct(rdd.rep2)
+  actual <- sort(unlist(collect(uniques)))
+  expect_equal(actual, nums)
+})
+
+test_that("maximum() on RDDs", {
+  max <- maximum(rdd)
+  expect_equal(max, 10)
+})
+
+test_that("minimum() on RDDs", {
+  min <- minimum(rdd)
+  expect_equal(min, 1)
+})
+
+test_that("sumRDD() on RDDs", {
+  sum <- sumRDD(rdd)
+  expect_equal(sum, 55)
+})
+
+test_that("keyBy on RDDs", {
+  func <- function(x) { x*x }
+  keys <- keyBy(rdd, func)
+  actual <- collect(keys)
+  expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
+})
+
+test_that("repartition/coalesce on RDDs", {
+  rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
+
+  # repartition
+  r1 <- repartition(rdd, 2)
+  expect_equal(numPartitions(r1), 2L)
+  count <- length(collectPartition(r1, 0L))
+  expect_true(count >= 8 && count <= 12)
+
+  r2 <- repartition(rdd, 6)
+  expect_equal(numPartitions(r2), 6L)
+  count <- length(collectPartition(r2, 0L))
+  expect_true(count >=0 && count <= 4)
+
+  # coalesce
+  r3 <- coalesce(rdd, 1)
+  expect_equal(numPartitions(r3), 1L)
+  count <- length(collectPartition(r3, 0L))
+  expect_equal(count, 20)
+})
+
+test_that("sortBy() on RDDs", {
+  sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
+  actual <- collect(sortedRdd)
+  expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
+
+  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+  sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
+  actual <- collect(sortedRdd2)
+  expect_equal(actual, as.list(nums))
+})
+
+test_that("takeOrdered() on RDDs", {
+  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+  rdd <- parallelize(sc, l)
+  actual <- takeOrdered(rdd, 6L)
+  expect_equal(actual, as.list(sort(unlist(l)))[1:6])
+
+  l <- list("e", "d", "c", "d", "a")
+  rdd <- parallelize(sc, l)
+  actual <- takeOrdered(rdd, 3L)
+  expect_equal(actual, as.list(sort(unlist(l)))[1:3])
+})
+
+test_that("top() on RDDs", {
+  l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+  rdd <- parallelize(sc, l)
+  actual <- top(rdd, 6L)
+  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
+  
+  l <- list("e", "d", "c", "d", "a")
+  rdd <- parallelize(sc, l)
+  actual <- top(rdd, 3L)
+  expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
+})
+
+test_that("fold() on RDDs", {
+  actual <- fold(rdd, 0, "+")
+  expect_equal(actual, Reduce("+", nums, 0))
+  
+  rdd <- parallelize(sc, list())
+  actual <- fold(rdd, 0, "+")
+  expect_equal(actual, 0)
+})
+
+test_that("aggregateRDD() on RDDs", {
+  rdd <- parallelize(sc, list(1, 2, 3, 4))
+  zeroValue <- list(0, 0)
+  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+  expect_equal(actual, list(10, 4))
+  
+  rdd <- parallelize(sc, list())
+  actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+  expect_equal(actual, list(0, 0))
+})
+
+test_that("zipWithUniqueId() on RDDs", {
+  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+  actual <- collect(zipWithUniqueId(rdd))
+  expected <- list(list("a", 0), list("b", 3), list("c", 1), 
+                   list("d", 4), list("e", 2))
+  expect_equal(actual, expected)
+  
+  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+  actual <- collect(zipWithUniqueId(rdd))
+  expected <- list(list("a", 0), list("b", 1), list("c", 2), 
+                   list("d", 3), list("e", 4))
+  expect_equal(actual, expected)
+})
+
+test_that("zipWithIndex() on RDDs", {
+  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+  actual <- collect(zipWithIndex(rdd))
+  expected <- list(list("a", 0), list("b", 1), list("c", 2), 
+                   list("d", 3), list("e", 4))
+  expect_equal(actual, expected)
+  
+  rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+  actual <- collect(zipWithIndex(rdd))
+  expected <- list(list("a", 0), list("b", 1), list("c", 2), 
+                   list("d", 3), list("e", 4))
+  expect_equal(actual, expected)
+})
+
+test_that("glom() on RDD", {
+  rdd <- parallelize(sc, as.list(1:4), 2L)
+  actual <- collect(glom(rdd))
+  expect_equal(actual, list(list(1, 2), list(3, 4)))
+})
+
+test_that("keys() on RDDs", {
+  keys <- keys(intRdd)
+  actual <- collect(keys)
+  expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
+})
+
+test_that("values() on RDDs", {
+  values <- values(intRdd)
+  actual <- collect(values)
+  expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
+})
+
+test_that("pipeRDD() on RDDs", {
+  actual <- collect(pipeRDD(rdd, "more"))
+  expected <- as.list(as.character(1:10))
+  expect_equal(actual, expected)
+  
+  trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
+  actual <- collect(pipeRDD(trailed.rdd, "sort"))
+  expected <- list("", "1", "2", "3")
+  expect_equal(actual, expected)
+  
+  rev.nums <- 9:0
+  rev.rdd <- parallelize(sc, rev.nums, 2L)
+  actual <- collect(pipeRDD(rev.rdd, "sort"))
+  expected <- as.list(as.character(c(5:9, 0:4)))
+  expect_equal(actual, expected)
+})
+
+test_that("zipRDD() on RDDs", {
+  rdd1 <- parallelize(sc, 0:4, 2)
+  rdd2 <- parallelize(sc, 1000:1004, 2)
+  actual <- collect(zipRDD(rdd1, rdd2))
+  expect_equal(actual,
+               list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
+  
+  mockFile = c("Spark is pretty.", "Spark is awesome.")
+  fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+  writeLines(mockFile, fileName)
+  
+  rdd <- textFile(sc, fileName, 1)
+  actual <- collect(zipRDD(rdd, rdd))
+  expected <- lapply(mockFile, function(x) { list(x ,x) })
+  expect_equal(actual, expected)
+
+  rdd1 <- parallelize(sc, 0:1, 1)
+  actual <- collect(zipRDD(rdd1, rdd))
+  expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
+  expect_equal(actual, expected)
+
+  rdd1 <- map(rdd, function(x) { x })
+  actual <- collect(zipRDD(rdd, rdd1))
+  expected <- lapply(mockFile, function(x) { list(x, x) })
+  expect_equal(actual, expected)
+ 
+  unlink(fileName)
+})
+
+test_that("join() on pairwise RDDs", {
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+  rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+  actual <- collect(join(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+  rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+  actual <- collect(join(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
+
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+  actual <- collect(join(rdd1, rdd2, 2L))
+  expect_equal(actual, list())
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+  actual <- collect(join(rdd1, rdd2, 2L))
+  expect_equal(actual, list())
+})
+
+test_that("leftOuterJoin() on pairwise RDDs", {
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+  rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+  rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+  expected <-  list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+  actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+})
+
+test_that("rightOuterJoin() on pairwise RDDs", {
+  rdd1 <- parallelize(sc, list(list(1,2), list(1,3)))
+  rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list("a",2), list("a",3)))
+  rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+  actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("fullOuterJoin() on pairwise RDDs", {
+  rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
+  rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
+  rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+  expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+
+  rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+  rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+  rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+  rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+  actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("sortByKey() on pairwise RDDs", {
+  numPairsRdd <- map(rdd, function(x) { list (x, x) })
+  sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
+  actual <- collect(sortedRdd)
+  numPairs <- lapply(nums, function(x) { list (x, x) })
+  expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
+
+  rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+  numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
+  sortedRdd2 <- sortByKey(numPairsRdd2)
+  actual <- collect(sortedRdd2)
+  expect_equal(actual, numPairs)
+
+  # sort by string keys
+  l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
+  rdd3 <- parallelize(sc, l, 2L)
+  sortedRdd3 <- sortByKey(rdd3)
+  actual <- collect(sortedRdd3)
+  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+  
+  # test on the boundary cases
+  
+  # boundary case 1: the RDD to be sorted has only 1 partition
+  rdd4 <- parallelize(sc, l, 1L)
+  sortedRdd4 <- sortByKey(rdd4)
+  actual <- collect(sortedRdd4)
+  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+  # boundary case 2: the sorted RDD has only 1 partition
+  rdd5 <- parallelize(sc, l, 2L)
+  sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
+  actual <- collect(sortedRdd5)
+  expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+  # boundary case 3: the RDD to be sorted has only 1 element
+  l2 <- list(list("a", 1))
+  rdd6 <- parallelize(sc, l2, 2L)
+  sortedRdd6 <- sortByKey(rdd6)
+  actual <- collect(sortedRdd6)
+  expect_equal(actual, l2)
+
+  # boundary case 4: the RDD to be sorted has 0 element
+  l3 <- list()
+  rdd7 <- parallelize(sc, l3, 2L)
+  sortedRdd7 <- sortByKey(rdd7)
+  actual <- collect(sortedRdd7)
+  expect_equal(actual, l3)  
+})
+
+test_that("collectAsMap() on a pairwise RDD", {
+  rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+  vals <- collectAsMap(rdd)
+  expect_equal(vals, list(`1` = 2, `3` = 4))
+
+  rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
+  vals <- collectAsMap(rdd)
+  expect_equal(vals, list(a = 1, b = 2))
+ 
+  rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
+  vals <- collectAsMap(rdd)
+  expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
+ 
+  rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
+  vals <- collectAsMap(rdd)
+  expect_equal(vals, list(`1` = "a", `2` = "b"))
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe0a1aa/R/pkg/inst/tests/test_shuffle.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_shuffle.R b/R/pkg/inst/tests/test_shuffle.R
new file mode 100644
index 0000000..d1da823
--- /dev/null
+++ b/R/pkg/inst/tests/test_shuffle.R
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("partitionBy, groupByKey, reduceByKey etc.")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
+doubleRdd <- parallelize(sc, doublePairs, 2L)
+
+numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
+                 list(3L, 0))
+numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
+
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
+                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
+strListRDD <- parallelize(sc, strList, 4)
+
+test_that("groupByKey for integers", {
+  grouped <- groupByKey(intRdd, 2L)
+
+  actual <- collect(grouped)
+
+  expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("groupByKey for doubles", {
+  grouped <- groupByKey(doubleRdd, 2L)
+
+  actual <- collect(grouped)
+
+  expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for ints", {
+  reduced <- reduceByKey(intRdd, "+", 2L)
+
+  actual <- collect(reduced)
+
+  expected <- list(list(2L, 101), list(1L, 199))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for doubles", {
+  reduced <- reduceByKey(doubleRdd, "+", 2L)
+  actual <- collect(reduced)
+
+  expected <- list(list(1.5, 199), list(2.5, 101))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for ints", {
+  reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
+
+  actual <- collect(reduced)
+
+  expected <- list(list(2L, 101), list(1L, 199))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for doubles", {
+  reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
+  actual <- collect(reduced)
+
+  expected <- list(list(1.5, 199), list(2.5, 101))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("aggregateByKey", {
+  # test aggregateByKey for int keys
+  rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+
+  zeroValue <- list(0, 0)
+  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+  aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)   
+  
+  actual <- collect(aggregatedRDD)
+  
+  expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+  # test aggregateByKey for string keys
+  rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
+  
+  zeroValue <- list(0, 0)
+  seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+  combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+  aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)   
+
+  actual <- collect(aggregatedRDD)
+  
+  expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("foldByKey", {  
+  # test foldByKey for int keys
+  folded <- foldByKey(intRdd, 0, "+", 2L)
+  
+  actual <- collect(folded)
+  
+  expected <- list(list(2L, 101), list(1L, 199))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+  # test foldByKey for double keys
+  folded <- foldByKey(doubleRdd, 0, "+", 2L)
+  
+  actual <- collect(folded)
+
+  expected <- list(list(1.5, 199), list(2.5, 101))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+  # test foldByKey for string keys
+  stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
+  
+  stringKeyRDD <- parallelize(sc, stringKeyPairs)
+  folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
+  
+  actual <- collect(folded)
+  
+  expected <- list(list("b", 101), list("a", 199))
+  expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+  
+  # test foldByKey for empty pair RDD
+  rdd <- parallelize(sc, list())
+  folded <- foldByKey(rdd, 0, "+", 2L)
+  actual <- collect(folded)
+  expected <- list()
+  expect_equal(actual, expected)
+
+  # test foldByKey for RDD with only 1 pair
+  rdd <- parallelize(sc,  list(list(1, 1)))
+  folded <- foldByKey(rdd, 0, "+", 2L)
+  actual <- collect(folded)
+  expected <- list(list(1, 1))
+  expect_equal(actual, expected)
+})
+
+test_that("partitionBy() partitions data correctly", {
+  # Partition by magnitude
+  partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
+
+  resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
+
+  expected_first <- list(list(1, 100), list(2, 200)) # key < 3
+  expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
+  actual_first <- collectPartition(resultRDD, 0L)
+  actual_second <- collectPartition(resultRDD, 1L)
+
+  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("partitionBy works with dependencies", {
+  kOne <- 1
+  partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
+
+  # Partition by parity
+  resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
+
+  # keys even; 100 %% 2 == 0
+  expected_first <- list(list(2, 200), list(4, -1))
+  # keys odd; 3 %% 2 == 1
+  expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
+  actual_first <- collectPartition(resultRDD, 0L)
+  actual_second <- collectPartition(resultRDD, 1L)
+
+  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("test partitionBy with string keys", {
+  words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
+  wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+  resultRDD <- partitionBy(wordCount, 2L)
+  expected_first <- list(list("Dexter", 1), list("Dexter", 1))
+  expected_second <- list(list("and", 1), list("and", 1))
+
+  actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
+                         collectPartition(resultRDD, 0L))
+  actual_second <- Filter(function(item) { item[[1]] == "and" },
+                          collectPartition(resultRDD, 1L))
+
+  expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+  expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org