You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sh...@apache.org on 2015/12/07 19:38:23 UTC
[2/4] spark git commit: [SPARK-12034][SPARKR] Eliminate warnings in
SparkR test cases.
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_binaryFile.R b/R/pkg/inst/tests/testthat/test_binaryFile.R
new file mode 100644
index 0000000..f2452ed
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_binaryFile.R
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions on binary files")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("saveAsObjectFile()/objectFile() following textFile() works", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1, 1)
+ saveAsObjectFile(rdd, fileName2)
+ rdd <- objectFile(sc, fileName2)
+ expect_equal(collect(rdd), as.list(mockFile))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+
+ l <- list(1, 2, 3)
+ rdd <- parallelize(sc, l, 1)
+ saveAsObjectFile(rdd, fileName)
+ rdd <- objectFile(sc, fileName)
+ expect_equal(collect(rdd), l)
+
+ unlink(fileName, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+
+ words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ counts <- reduceByKey(wordCount, "+", 2L)
+
+ saveAsObjectFile(counts, fileName2)
+ counts <- objectFile(sc, fileName2)
+
+ output <- collect(counts)
+ expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
+ list("is", 2))
+ expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
+ fileName1 <- tempfile(pattern="spark-test", fileext=".tmp")
+ fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
+
+ rdd1 <- parallelize(sc, "Spark is pretty.")
+ saveAsObjectFile(rdd1, fileName1)
+ rdd2 <- parallelize(sc, "Spark is awesome.")
+ saveAsObjectFile(rdd2, fileName2)
+
+ rdd <- objectFile(sc, c(fileName1, fileName2))
+ expect_equal(count(rdd), 2)
+
+ unlink(fileName1, recursive = TRUE)
+ unlink(fileName2, recursive = TRUE)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_binary_function.R b/R/pkg/inst/tests/testthat/test_binary_function.R
new file mode 100644
index 0000000..f054ac9
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_binary_function.R
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("binary functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+# File content
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("union on two RDDs", {
+ actual <- collect(unionRDD(rdd, rdd))
+ expect_equal(actual, as.list(rep(nums, 2)))
+
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ text.rdd <- textFile(sc, fileName)
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collect(union.rdd)
+ expect_equal(actual, c(as.list(nums), mockFile))
+ expect_equal(getSerializedMode(union.rdd), "byte")
+
+ rdd <- map(text.rdd, function(x) {x})
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collect(union.rdd)
+ expect_equal(actual, as.list(c(mockFile, mockFile)))
+ expect_equal(getSerializedMode(union.rdd), "byte")
+
+ unlink(fileName)
+})
+
+test_that("cogroup on two RDDs", {
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collect(cogroup.rdd)
+ expect_equal(actual,
+ list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
+ rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collect(cogroup.rdd)
+
+ expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
+
+test_that("zipPartitions() on RDDs", {
+ rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+ rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+ rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+ actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
+ func = function(x, y, z) { list(list(x, y, z))} ))
+ expect_equal(actual,
+ list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collect(zipPartitions(rdd, rdd,
+ func = function(x, y) { list(paste(x, y, sep = "\n")) }))
+ expected <- list(paste(mockFile, mockFile, sep = "\n"))
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collect(zipPartitions(rdd1, rdd,
+ func = function(x, y) { list(x + nchar(y)) }))
+ expected <- list(0:1 + nchar(mockFile))
+ expect_equal(actual, expected)
+
+ rdd <- map(rdd, function(x) { x })
+ actual <- collect(zipPartitions(rdd, rdd1,
+ func = function(x, y) { list(y + nchar(x)) }))
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_broadcast.R b/R/pkg/inst/tests/testthat/test_broadcast.R
new file mode 100644
index 0000000..bb86a5c
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_broadcast.R
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("broadcast variables")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rrdd <- parallelize(sc, nums, 2L)
+
+test_that("using broadcast variable", {
+ randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+ randomMatBr <- broadcast(sc, randomMat)
+
+ useBroadcast <- function(x) {
+ sum(SparkR:::value(randomMatBr) * x)
+ }
+ actual <- collect(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
+
+test_that("without using broadcast variable", {
+ randomMat <- matrix(nrow=10, ncol=10, data=rnorm(100))
+
+ useBroadcast <- function(x) {
+ sum(randomMat * x)
+ }
+ actual <- collect(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_client.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_client.R b/R/pkg/inst/tests/testthat/test_client.R
new file mode 100644
index 0000000..a0664f3
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_client.R
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in client.R")
+
+test_that("adding spark-testing-base as a package works", {
+ args <- generateSparkSubmitArgs("", "", "", "",
+ "holdenk:spark-testing-base:1.3.0_0.0.5")
+ expect_equal(gsub("[[:space:]]", "", args),
+ gsub("[[:space:]]", "",
+ "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
+})
+
+test_that("no package specified doesn't add packages flag", {
+ args <- generateSparkSubmitArgs("", "", "", "", "")
+ expect_equal(gsub("[[:space:]]", "", args),
+ "")
+})
+
+test_that("multiple packages don't produce a warning", {
+ expect_that(generateSparkSubmitArgs("", "", "", "", c("A", "B")), not(gives_warning()))
+})
+
+test_that("sparkJars sparkPackages as character vectors", {
+ args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
+ c("com.databricks:spark-avro_2.10:2.0.1",
+ "com.databricks:spark-csv_2.10:1.3.0"))
+ expect_match(args, "--jars one.jar,two.jar,three.jar")
+ expect_match(args,
+ "--packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.3.0")
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
new file mode 100644
index 0000000..1707e31
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("test functions in sparkR.R")
+
+test_that("repeatedly starting and stopping SparkR", {
+ for (i in 1:4) {
+ sc <- sparkR.init()
+ rdd <- parallelize(sc, 1:20, 2L)
+ expect_equal(count(rdd), 20)
+ sparkR.stop()
+ }
+})
+
+test_that("repeatedly starting and stopping SparkR SQL", {
+ for (i in 1:4) {
+ sc <- sparkR.init()
+ sqlContext <- sparkRSQL.init(sc)
+ df <- createDataFrame(sqlContext, data.frame(a = 1:20))
+ expect_equal(count(df), 20)
+ sparkR.stop()
+ }
+})
+
+test_that("rdd GC across sparkR.stop", {
+ sparkR.stop()
+ sc <- sparkR.init() # sc should get id 0
+ rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
+ rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
+ sparkR.stop()
+
+ sc <- sparkR.init() # sc should get id 0 again
+
+ # GC rdd1 before creating rdd3 and rdd2 after
+ rm(rdd1)
+ gc()
+
+ rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
+ rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
+
+ rm(rdd2)
+ gc()
+
+ count(rdd3)
+ count(rdd4)
+})
+
+test_that("job group functions can be called", {
+ sc <- sparkR.init()
+ setJobGroup(sc, "groupId", "job description", TRUE)
+ cancelJobGroup(sc, "groupId")
+ clearJobGroup(sc)
+})
+
+test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
+ e <- new.env()
+ e[["spark.driver.memory"]] <- "512m"
+ ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
+ expect_equal("--driver-memory \"512m\" sparkrmain", ops)
+
+ e[["spark.driver.memory"]] <- "5g"
+ e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
+ e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
+ e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
+ e[["random"]] <- "skipthis"
+ ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
+ # nolint start
+ expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
+ "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
+ "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
+ # nolint end
+
+ e[["spark.driver.extraClassPath"]] <- "/" # too short
+ ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
+ # nolint start
+ expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
+ "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
+ " --driver-memory 4g sparkr-shell2"))
+ # nolint end
+})
+
+test_that("sparkJars sparkPackages as comma-separated strings", {
+ expect_warning(processSparkJars(" a, b "))
+ jars <- suppressWarnings(processSparkJars(" a, b "))
+ expect_equal(jars, c("a", "b"))
+
+ jars <- suppressWarnings(processSparkJars(" abc ,, def "))
+ expect_equal(jars, c("abc", "def"))
+
+ jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
+ expect_equal(jars, c("abc", "def", "xyz", "a", "b"))
+
+ p <- processSparkPackages(c("ghi", "lmn"))
+ expect_equal(p, c("ghi", "lmn"))
+
+ # check normalizePath
+ f <- dir()[[1]]
+ expect_that(processSparkJars(f), not(gives_warning()))
+ expect_match(processSparkJars(f), f)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_includeJAR.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_includeJAR.R b/R/pkg/inst/tests/testthat/test_includeJAR.R
new file mode 100644
index 0000000..f89aa8e
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_includeJAR.R
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+context("include an external JAR in SparkContext")
+
+runScript <- function() {
+ sparkHome <- Sys.getenv("SPARK_HOME")
+ sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
+ jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
+ scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/testthat/jarTest.R")
+ submitPath <- file.path(sparkHome, "bin/spark-submit")
+ res <- system2(command = submitPath,
+ args = c(jarPath, scriptPath),
+ stdout = TRUE)
+ tail(res, 2)
+}
+
+test_that("sparkJars tag in SparkContext", {
+ testOutput <- runScript()
+ helloTest <- testOutput[1]
+ expect_equal(helloTest, "Hello, Dave")
+ basicFunction <- testOutput[2]
+ expect_equal(basicFunction, "4")
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_includePackage.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_includePackage.R b/R/pkg/inst/tests/testthat/test_includePackage.R
new file mode 100644
index 0000000..8152b44
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_includePackage.R
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("include R packages")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Partitioned data
+nums <- 1:2
+rdd <- parallelize(sc, nums, 2L)
+
+test_that("include inside function", {
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ suppressPackageStartupMessages(library(plyr))
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collect(data)
+ }
+})
+
+test_that("use include package", {
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ includePackage(sc, plyr)
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collect(data)
+ }
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
new file mode 100644
index 0000000..08099dd
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib functions")
+
+# Tests for MLlib functions in SparkR
+
+sc <- sparkR.init()
+
+sqlContext <- sparkRSQL.init(sc)
+
+test_that("glm and predict", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ test <- select(training, "Sepal_Length")
+ model <- glm(Sepal_Width ~ Sepal_Length, training, family = "gaussian")
+ prediction <- predict(model, test)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+
+ # Test stats::predict is working
+ x <- rnorm(15)
+ y <- x + rnorm(15)
+ expect_equal(length(predict(lm(y ~ x))), 15)
+})
+
+test_that("glm should work with long formula", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ training$LongLongLongLongLongName <- training$Sepal_Width
+ training$VeryLongLongLongLonLongName <- training$Sepal_Length
+ training$AnotherLongLongLongLongName <- training$Species
+ model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
+ data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("predictions match with native glm", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("dot minus and intercept vs native glm", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ model <- glm(Sepal_Width ~ . - Species + 0, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("feature interaction vs native glm", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("summary coefficients match with native glm", {
+ training <- suppressWarnings(createDataFrame(sqlContext, iris))
+ stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal"))
+ coefs <- unlist(stats$coefficients)
+ devianceResiduals <- unlist(stats$devianceResiduals)
+
+ rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
+ rCoefs <- unlist(rStats$coefficients)
+ rDevianceResiduals <- c(-0.95096, 0.72918)
+
+ expect_true(all(abs(rCoefs - coefs) < 1e-5))
+ expect_true(all(abs(rDevianceResiduals - devianceResiduals) < 1e-5))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
+})
+
+test_that("summary coefficients match with native glm of family 'binomial'", {
+ df <- suppressWarnings(createDataFrame(sqlContext, iris))
+ training <- filter(df, df$Species != "setosa")
+ stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
+ family = "binomial"))
+ coefs <- as.vector(stats$coefficients[,1])
+
+ rTraining <- iris[iris$Species %in% c("versicolor","virginica"),]
+ rCoefs <- as.vector(coef(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
+ family = binomial(link = "logit"))))
+
+ expect_true(all(abs(rCoefs - coefs) < 1e-4))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Sepal_Width")))
+})
+
+test_that("summary works on base GLM models", {
+ baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
+ baseSummary <- summary(baseModel)
+ expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_parallelize_collect.R b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
new file mode 100644
index 0000000..2552127
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("parallelize() and collect()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+ "violated, but I'm not. No, in fact, I think this is a friendly",
+ "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+ "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+ "other times it helps me control the chaos.",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+ "raising me. But they're both dead now. I didn't kill them. Honest.")
+
+numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
+strPairs <- list(list(strList, strList), list(strList, strList))
+
+# JavaSparkContext handle
+jsc <- sparkR.init()
+
+# Tests
+
+test_that("parallelize() on simple vectors and lists returns an RDD", {
+ numVectorRDD <- parallelize(jsc, numVector, 1)
+ numVectorRDD2 <- parallelize(jsc, numVector, 10)
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+
+ rdds <- c(numVectorRDD,
+ numVectorRDD2,
+ numListRDD,
+ numListRDD2,
+ strVectorRDD,
+ strVectorRDD2,
+ strListRDD,
+ strListRDD2)
+
+ for (rdd in rdds) {
+ expect_is(rdd, "RDD")
+ expect_true(.hasSlot(rdd, "jrdd")
+ && inherits(rdd@jrdd, "jobj")
+ && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
+ }
+})
+
+test_that("collect(), following a parallelize(), gives back the original collections", {
+ numVectorRDD <- parallelize(jsc, numVector, 10)
+ expect_equal(collect(numVectorRDD), as.list(numVector))
+
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ expect_equal(collect(numListRDD), as.list(numList))
+ expect_equal(collect(numListRDD2), as.list(numList))
+
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ expect_equal(collect(strVectorRDD), as.list(strVector))
+ expect_equal(collect(strVectorRDD2), as.list(strVector))
+
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+ expect_equal(collect(strListRDD), as.list(strList))
+ expect_equal(collect(strListRDD2), as.list(strList))
+})
+
+test_that("regression: collect() following a parallelize() does not drop elements", {
+ # 10 %/% 6 = 1, ceiling(10 / 6) = 2
+ collLen <- 10
+ numPart <- 6
+ expected <- runif(collLen)
+ actual <- collect(parallelize(jsc, expected, numPart))
+ expect_equal(actual, as.list(expected))
+})
+
+test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
+ # use the pairwise logical to indicate pairwise data
+ numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
+ numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
+ numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
+ expect_equal(collect(numPairsRDDD1), numPairs)
+ expect_equal(collect(numPairsRDDD2), numPairs)
+ expect_equal(collect(numPairsRDDD3), numPairs)
+ # can also leave out the parameter name, if the params are supplied in order
+ strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
+ strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
+ expect_equal(collect(strPairsRDDD1), strPairs)
+ expect_equal(collect(strPairsRDDD2), strPairs)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
new file mode 100644
index 0000000..7423b4f
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -0,0 +1,793 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("basic RDD functions")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+test_that("get number of partitions in RDD", {
+ expect_equal(getNumPartitions(rdd), 2)
+ expect_equal(getNumPartitions(intRdd), 2)
+})
+
+test_that("first on RDD", {
+ expect_equal(first(rdd), 1)
+ newrdd <- lapply(rdd, function(x) x + 1)
+ expect_equal(first(newrdd), 2)
+})
+
+test_that("count and length on RDD", {
+ expect_equal(count(rdd), 10)
+ expect_equal(length(rdd), 10)
+})
+
+test_that("count by values and keys", {
+ mods <- lapply(rdd, function(x) { x %% 3 })
+ actual <- countByValue(mods)
+ expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ actual <- countByKey(intRdd)
+ expected <- list(list(2L, 2L), list(1L, 2L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("lapply on RDD", {
+ multiples <- lapply(rdd, function(x) { 2 * x })
+ actual <- collect(multiples)
+ expect_equal(actual, as.list(nums * 2))
+})
+
+test_that("lapplyPartition on RDD", {
+ sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
+ actual <- collect(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("mapPartitions on RDD", {
+ sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
+ actual <- collect(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("flatMap() on RDDs", {
+ flat <- flatMap(intRdd, function(x) { list(x, x) })
+ actual <- collect(flat)
+ expect_equal(actual, rep(intPairs, each=2))
+})
+
+test_that("filterRDD on RDD", {
+ filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list(2, 4, 6, 8, 10))
+
+ filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list(list(1L, -1)))
+
+ # Filter out all elements.
+ filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
+ actual <- collect(filtered.rdd)
+ expect_equal(actual, list())
+})
+
+test_that("lookup on RDD", {
+ vals <- lookup(intRdd, 1L)
+ expect_equal(vals, list(-1, 200))
+
+ vals <- lookup(intRdd, 3L)
+ expect_equal(vals, list())
+})
+
+test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
+ rdd2 <- rdd
+ for (i in 1:12)
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2, function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex + i)
+ })
+ rdd2 <- lapply(rdd2, function(x) x + x)
+ actual <- collect(rdd2)
+ expected <- list(24, 24, 24, 24, 24,
+ 168, 170, 172, 174, 176)
+ expect_equal(actual, expected)
+})
+
+test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
+ # RDD
+ rdd2 <- rdd
+ # PipelinedRDD
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2,
+ function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex)
+ })
+
+ cache(rdd2)
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersist(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ persist(rdd2, "MEMORY_AND_DISK")
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersist(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ tempDir <- tempfile(pattern = "checkpoint")
+ setCheckpointDir(sc, tempDir)
+ checkpoint(rdd2)
+ expect_true(rdd2@env$isCheckpointed)
+
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+ expect_false(rdd2@env$isCheckpointed)
+
+ # make sure the data is collectable
+ collect(rdd2)
+
+ unlink(tempDir)
+})
+
+test_that("reduce on RDD", {
+ sum <- reduce(rdd, "+")
+ expect_equal(sum, 55)
+
+ # Also test with an inline function
+ sumInline <- reduce(rdd, function(x, y) { x + y })
+ expect_equal(sumInline, 55)
+})
+
+test_that("lapply with dependency", {
+ fa <- 5
+ multiples <- lapply(rdd, function(x) { fa * x })
+ actual <- collect(multiples)
+
+ expect_equal(actual, as.list(nums * 5))
+})
+
+test_that("lapplyPartitionsWithIndex on RDDs", {
+ func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
+ actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ expect_equal(actual, list(list(0, 15), list(1, 40)))
+
+ pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
+ partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
+ mkTup <- function(partIndex, part) { list(partIndex, part) }
+ actual <- collect(lapplyPartitionsWithIndex(
+ partitionBy(pairsRDD, 2L, partitionByParity),
+ mkTup),
+ FALSE)
+ expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
+ list(1, list(list(4, 8)))))
+})
+
+test_that("sampleRDD() on RDDs", {
+ expect_equal(unlist(collect(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+})
+
+test_that("takeSample() on RDDs", {
+ # ported from RDDSuite.scala, modified seeds
+ data <- parallelize(sc, 1:100, 2L)
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 20L, seed)
+ expect_equal(length(s), 20L)
+ expect_equal(length(unique(s)), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 200L, seed)
+ expect_equal(length(s), 100L)
+ expect_equal(length(unique(s)), 100L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 20L, seed)
+ expect_equal(length(s), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 100L, seed)
+ expect_equal(length(s), 100L)
+ # Chance of getting all distinct elements is astronomically low, so test we
+ # got < 100
+ expect_true(length(unique(s)) < 100L)
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 200L, seed)
+ expect_equal(length(s), 200L)
+ # Chance of getting all distinct elements is still quite low, so test we
+ # got < 100
+ expect_true(length(unique(s)) < 100L)
+ }
+})
+
+test_that("mapValues() on pairwise RDDs", {
+ multiples <- mapValues(intRdd, function(x) { x * 2 })
+ actual <- collect(multiples)
+ expected <- lapply(intPairs, function(x) {
+ list(x[[1]], x[[2]] * 2)
+ })
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("flatMapValues() on pairwise RDDs", {
+ l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
+ actual <- collect(flatMapValues(l, function(x) { x }))
+ expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4)))
+
+ # Generate x to x+1 for every value
+ actual <- collect(flatMapValues(intRdd, function(x) { x: (x + 1) }))
+ expect_equal(actual,
+ list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
+ list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
+})
+
+test_that("reduceByKeyLocally() on PairwiseRDDs", {
+ pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, 6), list(1.1, 3))))
+
+ pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
+ list("bb", 5)), 4L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
+})
+
+test_that("distinct() on RDDs", {
+ nums.rep2 <- rep(1:10, 2)
+ rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
+ uniques <- distinct(rdd.rep2)
+ actual <- sort(unlist(collect(uniques)))
+ expect_equal(actual, nums)
+})
+
+test_that("maximum() on RDDs", {
+ max <- maximum(rdd)
+ expect_equal(max, 10)
+})
+
+test_that("minimum() on RDDs", {
+ min <- minimum(rdd)
+ expect_equal(min, 1)
+})
+
+test_that("sumRDD() on RDDs", {
+ sum <- sumRDD(rdd)
+ expect_equal(sum, 55)
+})
+
+test_that("keyBy on RDDs", {
+ func <- function(x) { x * x }
+ keys <- keyBy(rdd, func)
+ actual <- collect(keys)
+ expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
+})
+
+test_that("repartition/coalesce on RDDs", {
+ rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
+
+ # repartition
+ r1 <- repartition(rdd, 2)
+ expect_equal(getNumPartitions(r1), 2L)
+ count <- length(collectPartition(r1, 0L))
+ expect_true(count >= 8 && count <= 12)
+
+ r2 <- repartition(rdd, 6)
+ expect_equal(getNumPartitions(r2), 6L)
+ count <- length(collectPartition(r2, 0L))
+ expect_true(count >= 0 && count <= 4)
+
+ # coalesce
+ r3 <- coalesce(rdd, 1)
+ expect_equal(getNumPartitions(r3), 1L)
+ count <- length(collectPartition(r3, 0L))
+ expect_equal(count, 20)
+})
+
+test_that("sortBy() on RDDs", {
+ sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
+ actual <- collect(sortedRdd)
+ expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
+ actual <- collect(sortedRdd2)
+ expect_equal(actual, as.list(nums))
+})
+
+test_that("takeOrdered() on RDDs", {
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:3])
+})
+
+test_that("top() on RDDs", {
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
+})
+
+test_that("fold() on RDDs", {
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, Reduce("+", nums, 0))
+
+ rdd <- parallelize(sc, list())
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, 0)
+})
+
+test_that("aggregateRDD() on RDDs", {
+ rdd <- parallelize(sc, list(1, 2, 3, 4))
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(10, 4))
+
+ rdd <- parallelize(sc, list())
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(0, 0))
+})
+
+test_that("zipWithUniqueId() on RDDs", {
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collect(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 3), list("c", 1),
+ list("d", 4), list("e", 2))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collect(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("zipWithIndex() on RDDs", {
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collect(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collect(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("glom() on RDD", {
+ rdd <- parallelize(sc, as.list(1:4), 2L)
+ actual <- collect(glom(rdd))
+ expect_equal(actual, list(list(1, 2), list(3, 4)))
+})
+
+test_that("keys() on RDDs", {
+ keys <- keys(intRdd)
+ actual <- collect(keys)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
+})
+
+test_that("values() on RDDs", {
+ values <- values(intRdd)
+ actual <- collect(values)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
+})
+
+test_that("pipeRDD() on RDDs", {
+ actual <- collect(pipeRDD(rdd, "more"))
+ expected <- as.list(as.character(1:10))
+ expect_equal(actual, expected)
+
+ trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
+ actual <- collect(pipeRDD(trailed.rdd, "sort"))
+ expected <- list("", "1", "2", "3")
+ expect_equal(actual, expected)
+
+ rev.nums <- 9:0
+ rev.rdd <- parallelize(sc, rev.nums, 2L)
+ actual <- collect(pipeRDD(rev.rdd, "sort"))
+ expected <- as.list(as.character(c(5:9, 0:4)))
+ expect_equal(actual, expected)
+})
+
+test_that("zipRDD() on RDDs", {
+ rdd1 <- parallelize(sc, 0:4, 2)
+ rdd2 <- parallelize(sc, 1000:1004, 2)
+ actual <- collect(zipRDD(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collect(zipRDD(rdd, rdd))
+ expected <- lapply(mockFile, function(x) { list(x ,x) })
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collect(zipRDD(rdd1, rdd))
+ expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
+ expect_equal(actual, expected)
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collect(zipRDD(rdd, rdd1))
+ expected <- lapply(mockFile, function(x) { list(x, x) })
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})
+
+test_that("cartesian() on RDDs", {
+ rdd <- parallelize(sc, 1:3)
+ actual <- collect(cartesian(rdd, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(1, 1), list(1, 2), list(1, 3),
+ list(2, 1), list(2, 2), list(2, 3),
+ list(3, 1), list(3, 2), list(3, 3)))
+
+ # test case where one RDD is empty
+ emptyRdd <- parallelize(sc, list())
+ actual <- collect(cartesian(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern="spark-test", fileext=".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+ actual <- collect(cartesian(rdd, rdd))
+ expected <- list(
+ list("Spark is awesome.", "Spark is pretty."),
+ list("Spark is awesome.", "Spark is awesome."),
+ list("Spark is pretty.", "Spark is pretty."),
+ list("Spark is pretty.", "Spark is awesome."))
+ expect_equal(sortKeyValueList(actual), expected)
+
+ rdd1 <- parallelize(sc, 0:1)
+ actual <- collect(cartesian(rdd1, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(0, "Spark is pretty."),
+ list(0, "Spark is awesome."),
+ list(1, "Spark is pretty."),
+ list(1, "Spark is awesome.")))
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collect(cartesian(rdd, rdd1))
+ expect_equal(sortKeyValueList(actual), expected)
+
+ unlink(fileName)
+})
+
+test_that("subtract() on RDDs", {
+ l <- list(1, 1, 2, 2, 3, 4)
+ rdd1 <- parallelize(sc, l)
+
+ # subtract by itself
+ actual <- collect(subtract(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtract by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
+ l)
+
+ rdd2 <- parallelize(sc, list(2, 4))
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
+ list(1, 1, 3))
+
+ l <- list("a", "a", "b", "b", "c", "d")
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list("b", "d"))
+ actual <- collect(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode="character"))),
+ list("a", "a", "c"))
+})
+
+test_that("subtractByKey() on pairwise RDDs", {
+ l <- list(list("a", 1), list("b", 4),
+ list("b", 5), list("a", 2))
+ rdd1 <- parallelize(sc, l)
+
+ # subtractByKey by itself
+ actual <- collect(subtractByKey(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtractByKey by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(l))
+
+ rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list("b", 4), list("b", 5)))
+
+ l <- list(list(1, 1), list(2, 4),
+ list(2, 5), list(1, 2))
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
+ actual <- collect(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(2, 4), list(2, 5)))
+})
+
+test_that("intersection() on RDDs", {
+ # intersection with self
+ actual <- collect(intersection(rdd, rdd))
+ expect_equal(sort(as.integer(actual)), nums)
+
+ # intersection with an empty RDD
+ emptyRdd <- parallelize(sc, list())
+ actual <- collect(intersection(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
+ rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
+ actual <- collect(intersection(rdd1, rdd2))
+ expect_equal(sort(as.integer(actual)), 1:3)
+})
+
+test_that("join() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+ rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+ rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(join(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+})
+
+test_that("leftOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
+ rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",4)))
+ rdd2 <- parallelize(sc, list(list("a",2), list("a",3)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
+
+test_that("rightOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,2), list(1,3)))
+ rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",2), list("a",3)))
+ rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("fullOuterJoin() on pairwise RDDs", {
+ rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
+ rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
+ list(2, list(NULL, 4)), list(3, list(3, NULL)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
+ rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
+ list("a", list(3, 1)), list("c", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
+ rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
+ list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
+ rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
+ actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
+ list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("sortByKey() on pairwise RDDs", {
+ numPairsRdd <- map(rdd, function(x) { list (x, x) })
+ sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
+ actual <- collect(sortedRdd)
+ numPairs <- lapply(nums, function(x) { list (x, x) })
+ expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
+ sortedRdd2 <- sortByKey(numPairsRdd2)
+ actual <- collect(sortedRdd2)
+ expect_equal(actual, numPairs)
+
+ # sort by string keys
+ l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
+ rdd3 <- parallelize(sc, l, 2L)
+ sortedRdd3 <- sortByKey(rdd3)
+ actual <- collect(sortedRdd3)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # test on the boundary cases
+
+ # boundary case 1: the RDD to be sorted has only 1 partition
+ rdd4 <- parallelize(sc, l, 1L)
+ sortedRdd4 <- sortByKey(rdd4)
+ actual <- collect(sortedRdd4)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 2: the sorted RDD has only 1 partition
+ rdd5 <- parallelize(sc, l, 2L)
+ sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
+ actual <- collect(sortedRdd5)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 3: the RDD to be sorted has only 1 element
+ l2 <- list(list("a", 1))
+ rdd6 <- parallelize(sc, l2, 2L)
+ sortedRdd6 <- sortByKey(rdd6)
+ actual <- collect(sortedRdd6)
+ expect_equal(actual, l2)
+
+ # boundary case 4: the RDD to be sorted has 0 element
+ l3 <- list()
+ rdd7 <- parallelize(sc, l3, 2L)
+ sortedRdd7 <- sortByKey(rdd7)
+ actual <- collect(sortedRdd7)
+ expect_equal(actual, l3)
+})
+
+test_that("collectAsMap() on a pairwise RDD", {
+ rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = 2, `3` = 4))
+
+ rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(a = 1, b = 2))
+
+ rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
+
+ rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = "a", `2` = "b"))
+})
+
+test_that("show()", {
+ rdd <- parallelize(sc, list(1:10))
+ expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
+})
+
+test_that("sampleByKey() on pairwise RDDs", {
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
+ fractions <- list(a = 0.2, b = 0.1)
+ sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
+ expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
+ expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
+ expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
+
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
+ fractions <- list(`2` = 0.2, `3` = 0.1)
+ sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
+ expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
+ expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
+ expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
+ expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
+ expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/39d677c8/R/pkg/inst/tests/testthat/test_shuffle.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_shuffle.R b/R/pkg/inst/tests/testthat/test_shuffle.R
new file mode 100644
index 0000000..adf0b91
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_shuffle.R
@@ -0,0 +1,221 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("partitionBy, groupByKey, reduceByKey etc.")
+
+# JavaSparkContext handle
+sc <- sparkR.init()
+
+# Data
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
+doubleRdd <- parallelize(sc, doublePairs, 2L)
+
+numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
+ list(3L, 0))
+numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
+
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
+strListRDD <- parallelize(sc, strList, 4)
+
+test_that("groupByKey for integers", {
+ grouped <- groupByKey(intRdd, 2L)
+
+ actual <- collect(grouped)
+
+ expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("groupByKey for doubles", {
+ grouped <- groupByKey(doubleRdd, 2L)
+
+ actual <- collect(grouped)
+
+ expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for ints", {
+ reduced <- reduceByKey(intRdd, "+", 2L)
+
+ actual <- collect(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for doubles", {
+ reduced <- reduceByKey(doubleRdd, "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for ints", {
+ reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
+
+ actual <- collect(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for doubles", {
+ reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for characters", {
+ stringKeyRDD <- parallelize(sc,
+ list(list("max", 1L), list("min", 2L),
+ list("other", 3L), list("max", 4L)), 2L)
+ reduced <- combineByKey(stringKeyRDD,
+ function(x) { x }, "+", "+", 2L)
+ actual <- collect(reduced)
+
+ expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("aggregateByKey", {
+ # test aggregateByKey for int keys
+ rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collect(aggregatedRDD)
+
+ expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test aggregateByKey for string keys
+ rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collect(aggregatedRDD)
+
+ expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("foldByKey", {
+ # test foldByKey for int keys
+ folded <- foldByKey(intRdd, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for double keys
+ folded <- foldByKey(doubleRdd, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for string keys
+ stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
+
+ stringKeyRDD <- parallelize(sc, stringKeyPairs)
+ folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
+
+ actual <- collect(folded)
+
+ expected <- list(list("b", 101), list("a", 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for empty pair RDD
+ rdd <- parallelize(sc, list())
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collect(folded)
+ expected <- list()
+ expect_equal(actual, expected)
+
+ # test foldByKey for RDD with only 1 pair
+ rdd <- parallelize(sc, list(list(1, 1)))
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collect(folded)
+ expected <- list(list(1, 1))
+ expect_equal(actual, expected)
+})
+
+test_that("partitionBy() partitions data correctly", {
+ # Partition by magnitude
+ partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
+
+ resultRDD <- partitionBy(numPairsRdd, 2L, partitionByMagnitude)
+
+ expected_first <- list(list(1, 100), list(2, 200)) # key < 3
+ expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key >= 3
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("partitionBy works with dependencies", {
+ kOne <- 1
+ partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
+
+ # Partition by parity
+ resultRDD <- partitionBy(numPairsRdd, numPartitions = 2L, partitionByParity)
+
+ # keys even; 100 %% 2 == 0
+ expected_first <- list(list(2, 200), list(4, -1))
+ # keys odd; 3 %% 2 == 1
+ expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("test partitionBy with string keys", {
+ words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ resultRDD <- partitionBy(wordCount, 2L)
+ expected_first <- list(list("Dexter", 1), list("Dexter", 1))
+ expected_second <- list(list("and", 1), list("and", 1))
+
+ actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
+ collectPartition(resultRDD, 0L))
+ actual_second <- Filter(function(item) { item[[1]] == "and" },
+ collectPartition(resultRDD, 1L))
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org