You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by ch...@apache.org on 2017/01/23 01:06:26 UTC
[3/7] incubator-toree git commit: Remove SparkR fork (mariusvniekerk
via chipsenkbeil) closes #87
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R b/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R
deleted file mode 100644
index c445d1b..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/R/sparkR.R
+++ /dev/null
@@ -1,360 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-.sparkREnv <- new.env()
-
-# Utility function that returns TRUE if we have an active connection to the
-# backend and FALSE otherwise
-connExists <- function(env) {
- tryCatch({
- exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
- },
- error = function(err) {
- return(FALSE)
- })
-}
-
-#' Stop the Spark context.
-#'
-#' Also terminates the backend this R session is connected to
-sparkR.stop <- function() {
- env <- .sparkREnv
- if (exists(".sparkRCon", envir = env)) {
- # cat("Stopping SparkR\n")
- if (exists(".sparkRjsc", envir = env)) {
- sc <- get(".sparkRjsc", envir = env)
- callJMethod(sc, "stop")
- rm(".sparkRjsc", envir = env)
- }
-
- if (exists(".backendLaunched", envir = env)) {
- callJStatic("SparkRHandler", "stopBackend")
- }
-
- # Also close the connection and remove it from our env
- conn <- get(".sparkRCon", envir = env)
- close(conn)
-
- rm(".sparkRCon", envir = env)
- rm(".scStartTime", envir = env)
- }
-
- if (exists(".monitorConn", envir = env)) {
- conn <- get(".monitorConn", envir = env)
- close(conn)
- rm(".monitorConn", envir = env)
- }
-
- # Clear all broadcast variables we have
- # as the jobj will not be valid if we restart the JVM
- clearBroadcastVariables()
-
- # Clear jobj maps
- clearJobjs()
-}
-
-#' Initialize a new Spark Context.
-#'
-#' This function initializes a new SparkContext.
-#'
-#' @param master The Spark master URL.
-#' @param appName Application name to register with cluster manager
-#' @param sparkHome Spark Home directory
-#' @param sparkEnvir Named list of environment variables to set on worker nodes.
-#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
-#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
-#' @param sparkPackages Character string vector of packages from spark-packages.org
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
-#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
-#' list(spark.executor.memory="1g"))
-#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
-#' list(spark.executor.memory="1g"),
-#' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
-#' c("jarfile1.jar","jarfile2.jar"))
-#'}
-
-sparkR.init <- function(
- master = "",
- appName = "SparkR",
- sparkHome = Sys.getenv("SPARK_HOME"),
- sparkEnvir = list(),
- sparkExecutorEnv = list(),
- sparkJars = "",
- sparkPackages = "") {
-
- if (exists(".sparkRjsc", envir = .sparkREnv)) {
- cat(paste("Re-using existing Spark Context.",
- "Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
- return(get(".sparkRjsc", envir = .sparkREnv))
- }
-
- jars <- suppressWarnings(normalizePath(as.character(sparkJars)))
-
- # Classpath separator is ";" on Windows
- # URI needs four /// as from http://stackoverflow.com/a/18522792
- if (.Platform$OS.type == "unix") {
- uriSep <- "//"
- } else {
- uriSep <- "////"
- }
-
- existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
- if (existingPort != "") {
- backendPort <- existingPort
- } else {
- path <- tempfile(pattern = "backend_port")
- launchBackend(
- args = path,
- sparkHome = sparkHome,
- jars = jars,
- sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
- packages = sparkPackages)
- # wait atmost 100 seconds for JVM to launch
- wait <- 0.1
- for (i in 1:25) {
- Sys.sleep(wait)
- if (file.exists(path)) {
- break
- }
- wait <- wait * 1.25
- }
- if (!file.exists(path)) {
- stop("JVM is not ready after 10 seconds")
- }
- f <- file(path, open="rb")
- backendPort <- readInt(f)
- monitorPort <- readInt(f)
- close(f)
- file.remove(path)
- if (length(backendPort) == 0 || backendPort == 0 ||
- length(monitorPort) == 0 || monitorPort == 0) {
- stop("JVM failed to launch")
- }
- assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
- assign(".backendLaunched", 1, envir = .sparkREnv)
- }
-
- .sparkREnv$backendPort <- backendPort
- tryCatch({
- connectBackend("localhost", backendPort)
- },
- error = function(err) {
- stop("Failed to connect JVM\n")
- })
-
- if (nchar(sparkHome) != 0) {
- sparkHome <- suppressWarnings(normalizePath(sparkHome))
- }
-
- sparkEnvirMap <- new.env()
- for (varname in names(sparkEnvir)) {
- sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
- }
-
- sparkExecutorEnvMap <- new.env()
- if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
- sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
- }
- for (varname in names(sparkExecutorEnv)) {
- sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
- }
-
- nonEmptyJars <- Filter(function(x) { x != "" }, jars)
- localJarPaths <- sapply(nonEmptyJars,
- function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
-
- # Set the start time to identify jobjs
- # Seconds resolution is good enough for this purpose, so use ints
- assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
-
- assign(
- ".sparkRjsc",
- callJStatic(
- "org.apache.spark.api.r.RRDD",
- "createSparkContext",
- master,
- appName,
- as.character(sparkHome),
- as.list(localJarPaths),
- sparkEnvirMap,
- sparkExecutorEnvMap),
- envir = .sparkREnv
- )
-
- sc <- get(".sparkRjsc", envir = .sparkREnv)
-
- # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
- reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
-
- sc
-}
-
-#' Connect to the R backend.
-#'
-#' This function establishes a connection with the R backend without creating
-#' a new SparkContext.
-#'
-#' @export
-#' @examples
-#'\dontrun{
-#' sparkR.connect()
-
-sparkR.connect <- function() {
- if (connExists(.sparkREnv)) {
- print("Connection to SparkR backend has already been established!")
- return()
- }
-
- # Only allow connecting to an existing backend
- existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
- if (existingPort != "") {
- backendPort <- existingPort
- } else {
- stop("No existing backend port found!")
- }
-
- # Connect to the backend service
- .sparkREnv$backendPort <- backendPort
- tryCatch({
- connectBackend("localhost", backendPort)
- }, error = function(err) {
- stop("Failed to connect JVM: ", err)
- })
-
- # Set the start time to identify jobjs
- # Seconds resolution is good enough for this purpose, so use ints
- assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
-
- # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
- reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
-}
-
-#' Initialize a new SQLContext.
-#'
-#' This function creates a SparkContext from an existing JavaSparkContext and
-#' then uses it to initialize a new SQLContext
-#'
-#' @param jsc The existing JavaSparkContext created with SparkR.init()
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlContext <- sparkRSQL.init(sc)
-#'}
-
-sparkRSQL.init <- function(jsc = NULL) {
- if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
- return(get(".sparkRSQLsc", envir = .sparkREnv))
- }
-
- # If jsc is NULL, create a Spark Context
- sc <- if (is.null(jsc)) {
- sparkR.init()
- } else {
- jsc
- }
-
- sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
- "createSQLContext",
- sc)
- assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv)
- sqlContext
-}
-
-#' Initialize a new HiveContext.
-#'
-#' This function creates a HiveContext from an existing JavaSparkContext
-#'
-#' @param jsc The existing JavaSparkContext created with SparkR.init()
-#' @export
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' sqlContext <- sparkRHive.init(sc)
-#'}
-
-sparkRHive.init <- function(jsc = NULL) {
- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
- return(get(".sparkRHivesc", envir = .sparkREnv))
- }
-
- # If jsc is NULL, create a Spark Context
- sc <- if (is.null(jsc)) {
- sparkR.init()
- } else {
- jsc
- }
-
- ssc <- callJMethod(sc, "sc")
- hiveCtx <- tryCatch({
- newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
- },
- error = function(err) {
- stop("Spark SQL is not built with Hive support")
- })
-
- assign(".sparkRHivesc", hiveCtx, envir = .sparkREnv)
- hiveCtx
-}
-
-#' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
-#' different value or cleared.
-#'
-#' @param sc existing spark context
-#' @param groupid the ID to be assigned to job groups
-#' @param description description for the the job group ID
-#' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
-#'}
-
-setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
- callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
-}
-
-#' Clear current job group ID and its description
-#'
-#' @param sc existing spark context
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' clearJobGroup(sc)
-#'}
-
-clearJobGroup <- function(sc) {
- callJMethod(sc, "clearJobGroup")
-}
-
-#' Cancel active jobs for the specified group
-#'
-#' @param sc existing spark context
-#' @param groupId the ID of job group to be cancelled
-#' @examples
-#'\dontrun{
-#' sc <- sparkR.init()
-#' cancelJobGroup(sc, "myJobGroup")
-#'}
-
-cancelJobGroup <- function(sc, groupId) {
- callJMethod(sc, "cancelJobGroup", groupId)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R b/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R
deleted file mode 100644
index 3babcb5..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/R/utils.R
+++ /dev/null
@@ -1,600 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Utilities and Helpers
-
-# Given a JList<T>, returns an R list containing the same elements, the number
-# of which is optionally upper bounded by `logicalUpperBound` (by default,
-# return all elements). Takes care of deserializations and type conversions.
-convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
- serializedMode = "byte") {
- arrSize <- callJMethod(jList, "size")
-
- # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
- # each partition is not dense-packed into one Array[Byte], and `arrSize`
- # here corresponds to number of logical elements. Thus we can prune here.
- if (serializedMode == "string" && !is.null(logicalUpperBound)) {
- arrSize <- min(arrSize, logicalUpperBound)
- }
-
- results <- if (arrSize > 0) {
- lapply(0 : (arrSize - 1),
- function(index) {
- obj <- callJMethod(jList, "get", as.integer(index))
-
- # Assume it is either an R object or a Java obj ref.
- if (inherits(obj, "jobj")) {
- if (isInstanceOf(obj, "scala.Tuple2")) {
- # JavaPairRDD[Array[Byte], Array[Byte]].
-
- keyBytes <- callJMethod(obj, "_1")
- valBytes <- callJMethod(obj, "_2")
- res <- list(unserialize(keyBytes),
- unserialize(valBytes))
- } else {
- stop(paste("utils.R: convertJListToRList only supports",
- "RDD[Array[Byte]] and",
- "JavaPairRDD[Array[Byte], Array[Byte]] for now"))
- }
- } else {
- if (inherits(obj, "raw")) {
- if (serializedMode == "byte") {
- # RDD[Array[Byte]]. `obj` is a whole partition.
- res <- unserialize(obj)
- # For serialized datasets, `obj` (and `rRaw`) here corresponds to
- # one whole partition dense-packed together. We deserialize the
- # whole partition first, then cap the number of elements to be returned.
- } else if (serializedMode == "row") {
- res <- readRowList(obj)
- # For DataFrames that have been converted to RRDDs, we call readRowList
- # which will read in each row of the RRDD as a list and deserialize
- # each element.
- flatten <<- FALSE
- # Use global assignment to change the flatten flag. This means
- # we don't have to worry about the default argument in other functions
- # e.g. collect
- }
- # TODO: is it possible to distinguish element boundary so that we can
- # unserialize only what we need?
- if (!is.null(logicalUpperBound)) {
- res <- head(res, n = logicalUpperBound)
- }
- } else {
- # obj is of a primitive Java type, is simplified to R's
- # corresponding type.
- res <- list(obj)
- }
- }
- res
- })
- } else {
- list()
- }
-
- if (flatten) {
- as.list(unlist(results, recursive = FALSE))
- } else {
- as.list(results)
- }
-}
-
-# Returns TRUE if `name` refers to an RDD in the given environment `env`
-isRDD <- function(name, env) {
- obj <- get(name, envir = env)
- inherits(obj, "RDD")
-}
-
-#' Compute the hashCode of an object
-#'
-#' Java-style function to compute the hashCode for the given object. Returns
-#' an integer value.
-#'
-#' @details
-#' This only works for integer, numeric and character types right now.
-#'
-#' @param key the object to be hashed
-#' @return the hash code as an integer
-#' @export
-#' @examples
-#' hashCode(1L) # 1
-#' hashCode(1.0) # 1072693248
-#' hashCode("1") # 49
-hashCode <- function(key) {
- if (class(key) == "integer") {
- as.integer(key[[1]])
- } else if (class(key) == "numeric") {
- # Convert the double to long and then calculate the hash code
- rawVec <- writeBin(key[[1]], con = raw())
- intBits <- packBits(rawToBits(rawVec), "integer")
- as.integer(bitwXor(intBits[2], intBits[1]))
- } else if (class(key) == "character") {
- # TODO: SPARK-7839 means we might not have the native library available
- if (is.loaded("stringHashCode")) {
- .Call("stringHashCode", key)
- } else {
- n <- nchar(key)
- if (n == 0) {
- 0L
- } else {
- asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
- hashC <- 0
- for (k in 1:length(asciiVals)) {
- hashC <- mult31AndAdd(hashC, asciiVals[k])
- }
- as.integer(hashC)
- }
- }
- } else {
- warning(paste("Could not hash object, returning 0", sep = ""))
- as.integer(0)
- }
-}
-
-# Helper function used to wrap a 'numeric' value to integer bounds.
-# Useful for implementing C-like integer arithmetic
-wrapInt <- function(value) {
- if (value > .Machine$integer.max) {
- value <- value - 2 * .Machine$integer.max - 2
- } else if (value < -1 * .Machine$integer.max) {
- value <- 2 * .Machine$integer.max + value + 2
- }
- value
-}
-
-# Multiply `val` by 31 and add `addVal` to the result. Ensures that
-# integer-overflows are handled at every step.
-mult31AndAdd <- function(val, addVal) {
- vec <- c(bitwShiftL(val, c(4,3,2,1,0)), addVal)
- Reduce(function(a, b) {
- wrapInt(as.numeric(a) + as.numeric(b))
- },
- vec)
-}
-
-# Create a new RDD with serializedMode == "byte".
-# Return itself if already in "byte" format.
-serializeToBytes <- function(rdd) {
- if (!inherits(rdd, "RDD")) {
- stop("Argument 'rdd' is not an RDD type.")
- }
- if (getSerializedMode(rdd) != "byte") {
- ser.rdd <- lapply(rdd, function(x) { x })
- return(ser.rdd)
- } else {
- return(rdd)
- }
-}
-
-# Create a new RDD with serializedMode == "string".
-# Return itself if already in "string" format.
-serializeToString <- function(rdd) {
- if (!inherits(rdd, "RDD")) {
- stop("Argument 'rdd' is not an RDD type.")
- }
- if (getSerializedMode(rdd) != "string") {
- ser.rdd <- lapply(rdd, function(x) { toString(x) })
- # force it to create jrdd using "string"
- getJRDD(ser.rdd, serializedMode = "string")
- return(ser.rdd)
- } else {
- return(rdd)
- }
-}
-
-# Fast append to list by using an accumulator.
-# http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
-#
-# The accumulator should has three fields size, counter and data.
-# This function amortizes the allocation cost by doubling
-# the size of the list every time it fills up.
-addItemToAccumulator <- function(acc, item) {
- if(acc$counter == acc$size) {
- acc$size <- acc$size * 2
- length(acc$data) <- acc$size
- }
- acc$counter <- acc$counter + 1
- acc$data[[acc$counter]] <- item
-}
-
-initAccumulator <- function() {
- acc <- new.env()
- acc$counter <- 0
- acc$data <- list(NULL)
- acc$size <- 1
- acc
-}
-
-# Utility function to sort a list of key value pairs
-# Used in unit tests
-sortKeyValueList <- function(kv_list, decreasing = FALSE) {
- keys <- sapply(kv_list, function(x) x[[1]])
- kv_list[order(keys, decreasing = decreasing)]
-}
-
-# Utility function to generate compact R lists from grouped rdd
-# Used in Join-family functions
-# param:
-# tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
-# cnull Boolean list where each element determines whether the corresponding list should
-# be converted to list(NULL)
-genCompactLists <- function(tagged_list, cnull) {
- len <- length(tagged_list)
- lists <- list(vector("list", len), vector("list", len))
- index <- list(1, 1)
-
- for (x in tagged_list) {
- tag <- x[[1]]
- idx <- index[[tag]]
- lists[[tag]][[idx]] <- x[[2]]
- index[[tag]] <- idx + 1
- }
-
- len <- lapply(index, function(x) x - 1)
- for (i in (1:2)) {
- if (cnull[[i]] && len[[i]] == 0) {
- lists[[i]] <- list(NULL)
- } else {
- length(lists[[i]]) <- len[[i]]
- }
- }
-
- lists
-}
-
-# Utility function to merge compact R lists
-# Used in Join-family functions
-# param:
-# left/right Two compact lists ready for Cartesian product
-mergeCompactLists <- function(left, right) {
- result <- list()
- length(result) <- length(left) * length(right)
- index <- 1
- for (i in left) {
- for (j in right) {
- result[[index]] <- list(i, j)
- index <- index + 1
- }
- }
- result
-}
-
-# Utility function to wrapper above two operations
-# Used in Join-family functions
-# param (same as genCompactLists):
-# tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
-# cnull Boolean list where each element determines whether the corresponding list should
-# be converted to list(NULL)
-joinTaggedList <- function(tagged_list, cnull) {
- lists <- genCompactLists(tagged_list, cnull)
- mergeCompactLists(lists[[1]], lists[[2]])
-}
-
-# Utility function to reduce a key-value list with predicate
-# Used in *ByKey functions
-# param
-# pair key-value pair
-# keys/vals env of key/value with hashes
-# updateOrCreatePred predicate function
-# updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
-# createFn create function for new pair, similar with `createCombiner` @combinebykey
-updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
- # assume hashVal bind to `$hash`, key/val with index 1/2
- hashVal <- pair$hash
- key <- pair[[1]]
- val <- pair[[2]]
- if (updateOrCreatePred(pair)) {
- assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
- } else {
- assign(hashVal, do.call(createFn, list(val)), envir = vals)
- assign(hashVal, key, envir = keys)
- }
-}
-
-# Utility function to convert key&values envs into key-val list
-convertEnvsToList <- function(keys, vals) {
- lapply(ls(keys),
- function(name) {
- list(keys[[name]], vals[[name]])
- })
-}
-
-# Utility function to capture the varargs into environment object
-varargsToEnv <- function(...) {
- # Based on http://stackoverflow.com/a/3057419/4577954
- pairs <- list(...)
- env <- new.env()
- for (name in names(pairs)) {
- env[[name]] <- pairs[[name]]
- }
- env
-}
-
-getStorageLevel <- function(newLevel = c("DISK_ONLY",
- "DISK_ONLY_2",
- "MEMORY_AND_DISK",
- "MEMORY_AND_DISK_2",
- "MEMORY_AND_DISK_SER",
- "MEMORY_AND_DISK_SER_2",
- "MEMORY_ONLY",
- "MEMORY_ONLY_2",
- "MEMORY_ONLY_SER",
- "MEMORY_ONLY_SER_2",
- "OFF_HEAP")) {
- match.arg(newLevel)
- storageLevelClass <- "org.apache.spark.storage.StorageLevel"
- storageLevel <- switch(newLevel,
- "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
- "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
- "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
- "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
- "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
- "MEMORY_AND_DISK_SER"),
- "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
- "MEMORY_AND_DISK_SER_2"),
- "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
- "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
- "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
- "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
- "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
-}
-
-# Utility function for functions where an argument needs to be integer but we want to allow
-# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
-numToInt <- function(num) {
- if (as.integer(num) != num) {
- warning(paste("Coercing", as.list(sys.call())[[2]], "to integer."))
- }
- as.integer(num)
-}
-
-# create a Seq in JVM
-toSeq <- function(...) {
- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", list(...))
-}
-
-# create a Seq in JVM from a list
-listToSeq <- function(l) {
- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", l)
-}
-
-# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
-# user defined function (UDF), and to examine variables in the UDF to decide
-# if their values should be included in the new function environment.
-# param
-# node The current AST node in the traversal.
-# oldEnv The original function environment.
-# defVars An Accumulator of variables names defined in the function's calling environment,
-# including function argument and local variable names.
-# checkedFunc An environment of function objects examined during cleanClosure. It can
-# be considered as a "name"-to-"list of functions" mapping.
-# newEnv A new function environment to store necessary function dependencies, an output argument.
-processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
- nodeLen <- length(node)
-
- if (nodeLen > 1 && typeof(node) == "language") {
- # Recursive case: current AST node is an internal node, check for its children.
- if (length(node[[1]]) > 1) {
- for (i in 1:nodeLen) {
- processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- } else {
- # if node[[1]] is length of 1, check for some R special functions.
- nodeChar <- as.character(node[[1]])
- if (nodeChar == "{" || nodeChar == "(") {
- # Skip start symbol.
- for (i in 2:nodeLen) {
- processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- } else if (nodeChar == "<-" || nodeChar == "=" ||
- nodeChar == "<<-") {
- # Assignment Ops.
- defVar <- node[[2]]
- if (length(defVar) == 1 && typeof(defVar) == "symbol") {
- # Add the defined variable name into defVars.
- addItemToAccumulator(defVars, as.character(defVar))
- } else {
- processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- for (i in 3:nodeLen) {
- processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- } else if (nodeChar == "function") {
- # Function definition.
- # Add parameter names.
- newArgs <- names(node[[2]])
- lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
- for (i in 3:nodeLen) {
- processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- } else if (nodeChar == "$") {
- # Skip the field.
- processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
- } else if (nodeChar == "::" || nodeChar == ":::") {
- processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
- } else {
- for (i in 1:nodeLen) {
- processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
- }
- }
- }
- } else if (nodeLen == 1 &&
- (typeof(node) == "symbol" || typeof(node) == "language")) {
- # Base case: current AST node is a leaf node and a symbol or a function call.
- nodeChar <- as.character(node)
- if (!nodeChar %in% defVars$data) {
- # Not a function parameter or local variable.
- func.env <- oldEnv
- topEnv <- parent.env(.GlobalEnv)
- # Search in function environment, and function's enclosing environments
- # up to global environment. There is no need to look into package environments
- # above the global or namespace environment that is not SparkR below the global,
- # as they are assumed to be loaded on workers.
- while (!identical(func.env, topEnv)) {
- # Namespaces other than "SparkR" will not be searched.
- if (!isNamespace(func.env) ||
- (getNamespaceName(func.env) == "SparkR" &&
- !(nodeChar %in% getNamespaceExports("SparkR")))) {
- # Only include SparkR internals.
-
- # Set parameter 'inherits' to FALSE since we do not need to search in
- # attached package environments.
- if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
- error = function(e) { FALSE })) {
- obj <- get(nodeChar, envir = func.env, inherits = FALSE)
- if (is.function(obj)) {
- # If the node is a function call.
- funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
- ifnotfound = list(list(NULL)))[[1]]
- found <- sapply(funcList, function(func) {
- ifelse(identical(func, obj), TRUE, FALSE)
- })
- if (sum(found) > 0) {
- # If function has been examined, ignore.
- break
- }
- # Function has not been examined, record it and recursively clean its closure.
- assign(nodeChar,
- if (is.null(funcList[[1]])) {
- list(obj)
- } else {
- append(funcList, obj)
- },
- envir = checkedFuncs)
- obj <- cleanClosure(obj, checkedFuncs)
- }
- assign(nodeChar, obj, envir = newEnv)
- break
- }
- }
-
- # Continue to search in enclosure.
- func.env <- parent.env(func.env)
- }
- }
- }
-}
-
-# Utility function to get user defined function (UDF) dependencies (closure).
-# More specifically, this function captures the values of free variables defined
-# outside a UDF, and stores them in the function's environment.
-# param
-# func A function whose closure needs to be captured.
-# checkedFunc An environment of function objects examined during cleanClosure. It can be
-# considered as a "name"-to-"list of functions" mapping.
-# return value
-# a new version of func that has an correct environment (closure).
-cleanClosure <- function(func, checkedFuncs = new.env()) {
- if (is.function(func)) {
- newEnv <- new.env(parent = .GlobalEnv)
- func.body <- body(func)
- oldEnv <- environment(func)
- # defVars is an Accumulator of variables names defined in the function's calling
- # environment. First, function's arguments are added to defVars.
- defVars <- initAccumulator()
- argNames <- names(as.list(args(func)))
- for (i in 1:(length(argNames) - 1)) {
- # Remove the ending NULL in pairlist.
- addItemToAccumulator(defVars, argNames[i])
- }
- # Recursively examine variables in the function body.
- processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
- environment(func) <- newEnv
- }
- func
-}
-
-# Append partition lengths to each partition in two input RDDs if needed.
-# param
-# x An RDD.
-# Other An RDD.
-# return value
-# A list of two result RDDs.
-appendPartitionLengths <- function(x, other) {
- if (getSerializedMode(x) != getSerializedMode(other) ||
- getSerializedMode(x) == "byte") {
- # Append the number of elements in each partition to that partition so that we can later
- # know the boundary of elements from x and other.
- #
- # Note that this appending also serves the purpose of reserialization, because even if
- # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
- # as a single byte array. For example, partitions of an RDD generated from partitionBy()
- # may be encoded as multiple byte arrays.
- appendLength <- function(part) {
- len <- length(part)
- part[[len + 1]] <- len + 1
- part
- }
- x <- lapplyPartition(x, appendLength)
- other <- lapplyPartition(other, appendLength)
- }
- list (x, other)
-}
-
-# Perform zip or cartesian between elements from two RDDs in each partition
-# param
-# rdd An RDD.
-# zip A boolean flag indicating this call is for zip operation or not.
-# return value
-# A result RDD.
-mergePartitions <- function(rdd, zip) {
- serializerMode <- getSerializedMode(rdd)
- partitionFunc <- function(partIndex, part) {
- len <- length(part)
- if (len > 0) {
- if (serializerMode == "byte") {
- lengthOfValues <- part[[len]]
- lengthOfKeys <- part[[len - lengthOfValues]]
- stopifnot(len == lengthOfKeys + lengthOfValues)
-
- # For zip operation, check if corresponding partitions
- # of both RDDs have the same number of elements.
- if (zip && lengthOfKeys != lengthOfValues) {
- stop(paste("Can only zip RDDs with same number of elements",
- "in each pair of corresponding partitions."))
- }
-
- if (lengthOfKeys > 1) {
- keys <- part[1 : (lengthOfKeys - 1)]
- } else {
- keys <- list()
- }
- if (lengthOfValues > 1) {
- values <- part[ (lengthOfKeys + 1) : (len - 1) ]
- } else {
- values <- list()
- }
-
- if (!zip) {
- return(mergeCompactLists(keys, values))
- }
- } else {
- keys <- part[c(TRUE, FALSE)]
- values <- part[c(FALSE, TRUE)]
- }
- mapply(
- function(k, v) { list(k, v) },
- keys,
- values,
- SIMPLIFY = FALSE,
- USE.NAMES = FALSE)
- } else {
- part
- }
- }
-
- PipelinedRDD(rdd, partitionFunc)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R
deleted file mode 100644
index 2a8a821..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/general.R
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-.First <- function() {
- packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
- .libPaths(c(packageDir, .libPaths()))
- Sys.setenv(NOAWT=1)
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R
deleted file mode 100644
index 7189f1a..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/profile/shell.R
+++ /dev/null
@@ -1,47 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-.First <- function() {
- home <- Sys.getenv("SPARK_HOME")
- .libPaths(c(file.path(home, "R", "lib"), .libPaths()))
- Sys.setenv(NOAWT=1)
-
- # Make sure SparkR package is the last loaded one
- old <- getOption("defaultPackages")
- options(defaultPackages = c(old, "SparkR"))
-
- sc <- SparkR::sparkR.init()
- assign("sc", sc, envir=.GlobalEnv)
- sqlContext <- SparkR::sparkRSQL.init(sc)
- sparkVer <- SparkR:::callJMethod(sc, "version")
- assign("sqlContext", sqlContext, envir=.GlobalEnv)
- cat("\n Welcome to")
- cat("\n")
- cat(" ____ __", "\n")
- cat(" / __/__ ___ _____/ /__", "\n")
- cat(" _\\ \\/ _ \\/ _ `/ __/ '_/", "\n")
- cat(" /___/ .__/\\_,_/_/ /_/\\_\\")
- if (nchar(sparkVer) == 0) {
- cat("\n")
- } else {
- cat(" version ", sparkVer, "\n")
- }
- cat(" /_/", "\n")
- cat("\n")
-
- cat("\n Spark context is available as sc, SQL context is available as sqlContext\n")
-}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar b/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar
deleted file mode 100644
index 1d5c2af..0000000
Binary files a/sparkr-interpreter/src/main/resources/R/pkg/inst/test_support/sparktestjar_2.10-1.0.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R
deleted file mode 100644
index d68bb20..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/jarTest.R
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-library(SparkR)
-
-sc <- sparkR.init()
-
-helloTest <- SparkR:::callJStatic("sparkR.test.hello",
- "helloWorld",
- "Dave")
-
-basicFunction <- SparkR:::callJStatic("sparkR.test.basicFunction",
- "addStuff",
- 2L,
- 2L)
-
-sparkR.stop()
-output <- c(helloTest, basicFunction)
-writeLines(output)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R
deleted file mode 100644
index 207a37a..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/packageInAJarTest.R
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-library(SparkR)
-library(sparkPackageTest)
-
-sc <- sparkR.init()
-
-run1 <- myfunc(5L)
-
-run2 <- myfunc(-4L)
-
-sparkR.stop()
-
-if(run1 != 6) quit(save = "no", status = 1)
-
-if(run2 != -3) quit(save = "no", status = 1)
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R
deleted file mode 100644
index f2452ed..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binaryFile.R
+++ /dev/null
@@ -1,89 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions on binary files")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("saveAsObjectFile()/objectFile() following textFile() works", {
- fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
- fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
- writeLines(mockFile, fileName1)
-
- rdd <- textFile(sc, fileName1, 1)
- saveAsObjectFile(rdd, fileName2)
- rdd <- objectFile(sc, fileName2)
- expect_equal(collect(rdd), as.list(mockFile))
-
- unlink(fileName1)
- unlink(fileName2, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
- fileName <- tempfile(pattern="spark-test", fileext=".tmp")
-
- l <- list(1, 2, 3)
- rdd <- parallelize(sc, l, 1)
- saveAsObjectFile(rdd, fileName)
- rdd <- objectFile(sc, fileName)
- expect_equal(collect(rdd), l)
-
- unlink(fileName, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
- fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
- fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
- writeLines(mockFile, fileName1)
-
- rdd <- textFile(sc, fileName1)
-
- words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
- wordCount <- lapply(words, function(word) { list(word, 1L) })
-
- counts <- reduceByKey(wordCount, "+", 2L)
-
- saveAsObjectFile(counts, fileName2)
- counts <- objectFile(sc, fileName2)
-
- output <- collect(counts)
- expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
- list("is", 2))
- expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
-
- unlink(fileName1)
- unlink(fileName2, recursive = TRUE)
-})
-
-test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
- fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
- fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
-
- rdd1 <- parallelize(sc, "Spark is pretty.")
- saveAsObjectFile(rdd1, fileName1)
- rdd2 <- parallelize(sc, "Spark is awesome.")
- saveAsObjectFile(rdd2, fileName2)
-
- rdd <- objectFile(sc, c(fileName1, fileName2))
- expect_equal(count(rdd), 2)
-
- unlink(fileName1, recursive = TRUE)
- unlink(fileName2, recursive = TRUE)
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R
deleted file mode 100644
index f054ac9..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_binary_function.R
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("binary functions")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Data
-nums <- 1:10
-rdd <- parallelize(sc, nums, 2L)
-
-# File content
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("union on two RDDs", {
- actual <- collect(unionRDD(rdd, rdd))
- expect_equal(actual, as.list(rep(nums, 2)))
-
- fileName <- tempfile(pattern="spark-test", fileext=".tmp")
- writeLines(mockFile, fileName)
-
- text.rdd <- textFile(sc, fileName)
- union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
- expect_equal(actual, c(as.list(nums), mockFile))
- expect_equal(getSerializedMode(union.rdd), "byte")
-
- rdd <- map(text.rdd, function(x) {x})
- union.rdd <- unionRDD(rdd, text.rdd)
- actual <- collect(union.rdd)
- expect_equal(actual, as.list(c(mockFile, mockFile)))
- expect_equal(getSerializedMode(union.rdd), "byte")
-
- unlink(fileName)
-})
-
-test_that("cogroup on two RDDs", {
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
- expect_equal(actual,
- list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
- rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
- cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
- actual <- collect(cogroup.rdd)
-
- expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-})
-
-test_that("zipPartitions() on RDDs", {
- rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
- rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
- rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
- actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
- func = function(x, y, z) { list(list(x, y, z))} ))
- expect_equal(actual,
- list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
-
- mockFile <- c("Spark is pretty.", "Spark is awesome.")
- fileName <- tempfile(pattern="spark-test", fileext=".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName, 1)
- actual <- collect(zipPartitions(rdd, rdd,
- func = function(x, y) { list(paste(x, y, sep = "\n")) }))
- expected <- list(paste(mockFile, mockFile, sep = "\n"))
- expect_equal(actual, expected)
-
- rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collect(zipPartitions(rdd1, rdd,
- func = function(x, y) { list(x + nchar(y)) }))
- expected <- list(0:1 + nchar(mockFile))
- expect_equal(actual, expected)
-
- rdd <- map(rdd, function(x) { x })
- actual <- collect(zipPartitions(rdd, rdd1,
- func = function(x, y) { list(y + nchar(x)) }))
- expect_equal(actual, expected)
-
- unlink(fileName)
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R
deleted file mode 100644
index bb86a5c..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_broadcast.R
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("broadcast variables")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Partitioned data
-nums <- 1:2
-rrdd <- parallelize(sc, nums, 2L)
-
-test_that("using broadcast variable", {
- randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
- randomMatBr <- broadcast(sc, randomMat)
-
- useBroadcast <- function(x) {
- sum(SparkR:::value(randomMatBr) * x)
- }
- actual <- collect(lapply(rrdd, useBroadcast))
- expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
- expect_equal(actual, expected)
-})
-
-test_that("without using broadcast variable", {
- randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
-
- useBroadcast <- function(x) {
- sum(randomMat * x)
- }
- actual <- collect(lapply(rrdd, useBroadcast))
- expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
- expect_equal(actual, expected)
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R
deleted file mode 100644
index 8a20991..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_client.R
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions in client.R")
-
-test_that("adding spark-testing-base as a package works", {
- args <- generateSparkSubmitArgs("", "", "", "",
- "holdenk:spark-testing-base:1.3.0_0.0.5")
- expect_equal(gsub("[[:space:]]", "", args),
- gsub("[[:space:]]", "",
- "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
-})
-
-test_that("no package specified doesn't add packages flag", {
- args <- generateSparkSubmitArgs("", "", "", "", "")
- expect_equal(gsub("[[:space:]]", "", args),
- "")
-})
-
-test_that("multiple packages don't produce a warning", {
- expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R
deleted file mode 100644
index 513bbc8..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_context.R
+++ /dev/null
@@ -1,57 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("test functions in sparkR.R")
-
-test_that("repeatedly starting and stopping SparkR", {
- for (i in 1:4) {
- sc <- sparkR.init()
- rdd <- parallelize(sc, 1:20, 2L)
- expect_equal(count(rdd), 20)
- sparkR.stop()
- }
-})
-
-test_that("rdd GC across sparkR.stop", {
- sparkR.stop()
- sc <- sparkR.init() # sc should get id 0
- rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
- rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
- sparkR.stop()
-
- sc <- sparkR.init() # sc should get id 0 again
-
- # GC rdd1 before creating rdd3 and rdd2 after
- rm(rdd1)
- gc()
-
- rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
- rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
-
- rm(rdd2)
- gc()
-
- count(rdd3)
- count(rdd4)
-})
-
-test_that("job group functions can be called", {
- sc <- sparkR.init()
- setJobGroup(sc, "groupId", "job description", TRUE)
- cancelJobGroup(sc, "groupId")
- clearJobGroup(sc)
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R
deleted file mode 100644
index cc1faea..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includeJAR.R
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-context("include an external JAR in SparkContext")
-
-runScript <- function() {
- sparkHome <- Sys.getenv("SPARK_HOME")
- sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
- jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
- scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
- submitPath <- file.path(sparkHome, "bin/spark-submit")
- res <- system2(command = submitPath,
- args = c(jarPath, scriptPath),
- stdout = TRUE)
- tail(res, 2)
-}
-
-test_that("sparkJars tag in SparkContext", {
- testOutput <- runScript()
- helloTest <- testOutput[1]
- expect_equal(helloTest, "Hello, Dave")
- basicFunction <- testOutput[2]
- expect_equal(basicFunction, "4")
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R
deleted file mode 100644
index 8152b44..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_includePackage.R
+++ /dev/null
@@ -1,57 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("include R packages")
-
-# JavaSparkContext handle
-sc <- sparkR.init()
-
-# Partitioned data
-nums <- 1:2
-rdd <- parallelize(sc, nums, 2L)
-
-test_that("include inside function", {
- # Only run the test if plyr is installed.
- if ("plyr" %in% rownames(installed.packages())) {
- suppressPackageStartupMessages(library(plyr))
- generateData <- function(x) {
- suppressPackageStartupMessages(library(plyr))
- attach(airquality)
- result <- transform(Ozone, logOzone = log(Ozone))
- result
- }
-
- data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
- }
-})
-
-test_that("use include package", {
- # Only run the test if plyr is installed.
- if ("plyr" %in% rownames(installed.packages())) {
- suppressPackageStartupMessages(library(plyr))
- generateData <- function(x) {
- attach(airquality)
- result <- transform(Ozone, logOzone = log(Ozone))
- result
- }
-
- includePackage(sc, plyr)
- data <- lapplyPartition(rdd, generateData)
- actual <- collect(data)
- }
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R
deleted file mode 100644
index f272de7..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_mllib.R
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib functions")
-
-# Tests for MLlib functions in SparkR
-
-sc <- sparkR.init()
-
-sqlContext <- sparkRSQL.init(sc)
-
-test_that("glm and predict", {
- training <- createDataFrame(sqlContext, iris)
- test <- select(training, "Sepal_Length")
- model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
- prediction <- predict(model, test)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
-})
-
-test_that("predictions match with native glm", {
- training <- createDataFrame(sqlContext, iris)
- model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("dot minus and intercept vs native glm", {
- training <- createDataFrame(sqlContext, iris)
- model <- glm(Sepal_Width ~ . - Species + 0, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("summary coefficients match with native glm", {
- training <- createDataFrame(sqlContext, iris)
- stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
- coefs <- as.vector(stats$coefficients)
- rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)))
- expect_true(all(abs(rCoefs - coefs) < 1e-6))
- expect_true(all(
- as.character(stats$features) ==
- c("(Intercept)", "Sepal_Length", "Species__versicolor", "Species__virginica")))
-})
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/2eb26cd3/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R b/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R
deleted file mode 100644
index 2552127..0000000
--- a/sparkr-interpreter/src/main/resources/R/pkg/inst/tests/test_parallelize_collect.R
+++ /dev/null
@@ -1,109 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("parallelize() and collect()")
-
-# Mock data
-numVector <- c(-10:97)
-numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
-strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
- "violated, but I'm not. No, in fact, I think this is a friendly",
- "message, like \"Hey, wanna play?\" and yes, I want to play. ",
- "I really, really do.")
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
- "other times it helps me control the chaos.",
- "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
- "raising me. But they're both dead now. I didn't kill them. Honest.")
-
-numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
-strPairs <- list(list(strList, strList), list(strList, strList))
-
-# JavaSparkContext handle
-jsc <- sparkR.init()
-
-# Tests
-
-test_that("parallelize() on simple vectors and lists returns an RDD", {
- numVectorRDD <- parallelize(jsc, numVector, 1)
- numVectorRDD2 <- parallelize(jsc, numVector, 10)
- numListRDD <- parallelize(jsc, numList, 1)
- numListRDD2 <- parallelize(jsc, numList, 4)
- strVectorRDD <- parallelize(jsc, strVector, 2)
- strVectorRDD2 <- parallelize(jsc, strVector, 3)
- strListRDD <- parallelize(jsc, strList, 4)
- strListRDD2 <- parallelize(jsc, strList, 1)
-
- rdds <- c(numVectorRDD,
- numVectorRDD2,
- numListRDD,
- numListRDD2,
- strVectorRDD,
- strVectorRDD2,
- strListRDD,
- strListRDD2)
-
- for (rdd in rdds) {
- expect_is(rdd, "RDD")
- expect_true(.hasSlot(rdd, "jrdd")
- && inherits(rdd@jrdd, "jobj")
- && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
- }
-})
-
-test_that("collect(), following a parallelize(), gives back the original collections", {
- numVectorRDD <- parallelize(jsc, numVector, 10)
- expect_equal(collect(numVectorRDD), as.list(numVector))
-
- numListRDD <- parallelize(jsc, numList, 1)
- numListRDD2 <- parallelize(jsc, numList, 4)
- expect_equal(collect(numListRDD), as.list(numList))
- expect_equal(collect(numListRDD2), as.list(numList))
-
- strVectorRDD <- parallelize(jsc, strVector, 2)
- strVectorRDD2 <- parallelize(jsc, strVector, 3)
- expect_equal(collect(strVectorRDD), as.list(strVector))
- expect_equal(collect(strVectorRDD2), as.list(strVector))
-
- strListRDD <- parallelize(jsc, strList, 4)
- strListRDD2 <- parallelize(jsc, strList, 1)
- expect_equal(collect(strListRDD), as.list(strList))
- expect_equal(collect(strListRDD2), as.list(strList))
-})
-
-test_that("regression: collect() following a parallelize() does not drop elements", {
- # 10 %/% 6 = 1, ceiling(10 / 6) = 2
- collLen <- 10
- numPart <- 6
- expected <- runif(collLen)
- actual <- collect(parallelize(jsc, expected, numPart))
- expect_equal(actual, as.list(expected))
-})
-
-test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
- # use the pairwise logical to indicate pairwise data
- numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
- numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
- numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
- expect_equal(collect(numPairsRDDD1), numPairs)
- expect_equal(collect(numPairsRDDD2), numPairs)
- expect_equal(collect(numPairsRDDD3), numPairs)
- # can also leave out the parameter name, if the params are supplied in order
- strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
- strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
- expect_equal(collect(strPairsRDDD1), strPairs)
- expect_equal(collect(strPairsRDDD2), strPairs)
-})