You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/06/11 07:00:39 UTC
[3/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic
tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_fpm.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R
new file mode 100644
index 0000000..4e10ca1
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_fpm.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib frequent pattern mining")
+
+# Tests for MLlib frequent pattern mining algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("spark.fpGrowth", {
+ data <- selectExpr(createDataFrame(data.frame(items = c(
+ "1,2",
+ "1,2",
+ "1,2,3",
+ "1,3"
+ ))), "split(items, ',') as items")
+
+ model <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8, numPartitions = 1)
+
+ itemsets <- collect(spark.freqItemsets(model))
+
+ expected_itemsets <- data.frame(
+ items = I(list(list("3"), list("3", "1"), list("2"), list("2", "1"), list("1"))),
+ freq = c(2, 2, 3, 3, 4)
+ )
+
+ expect_equivalent(expected_itemsets, itemsets)
+
+ expected_association_rules <- data.frame(
+ antecedent = I(list(list("2"), list("3"))),
+ consequent = I(list(list("1"), list("1"))),
+ confidence = c(1, 1)
+ )
+
+ expect_equivalent(expected_association_rules, collect(spark.associationRules(model)))
+
+ new_data <- selectExpr(createDataFrame(data.frame(items = c(
+ "1,2",
+ "1,3",
+ "2,3"
+ ))), "split(items, ',') as items")
+
+ expected_predictions <- data.frame(
+ items = I(list(list("1", "2"), list("1", "3"), list("2", "3"))),
+ prediction = I(list(list(), list(), list("1")))
+ )
+
+ expect_equivalent(expected_predictions, collect(predict(model, new_data)))
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
+ write.ml(model, modelPath, overwrite = TRUE)
+ loaded_model <- read.ml(modelPath)
+
+ expect_equivalent(
+ itemsets,
+ collect(spark.freqItemsets(loaded_model)))
+
+ unlink(modelPath)
+ }
+
+ model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8)
+ expect_equal(
+ count(spark.freqItemsets(model_without_numpartitions)),
+ count(spark.freqItemsets(model))
+ )
+
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_recommendation.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_recommendation.R b/R/pkg/tests/fulltests/test_mllib_recommendation.R
new file mode 100644
index 0000000..cc8064f
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_recommendation.R
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib recommendation algorithms")
+
+# Tests for MLlib recommendation algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("spark.als", {
+ data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
+ list(2, 1, 1.0), list(2, 2, 5.0))
+ df <- createDataFrame(data, c("user", "item", "score"))
+ model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item",
+ rank = 10, maxIter = 5, seed = 0, regParam = 0.1)
+ stats <- summary(model)
+ expect_equal(stats$rank, 10)
+ test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item"))
+ predictions <- collect(predict(model, test))
+
+ expect_equal(predictions$prediction, c(-0.1380762, 2.6258414, -1.5018409),
+ tolerance = 1e-4)
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats2$rating, "score")
+ userFactors <- collect(stats$userFactors)
+ itemFactors <- collect(stats$itemFactors)
+ userFactors2 <- collect(stats2$userFactors)
+ itemFactors2 <- collect(stats2$itemFactors)
+
+ orderUser <- order(userFactors$id)
+ orderUser2 <- order(userFactors2$id)
+ expect_equal(userFactors$id[orderUser], userFactors2$id[orderUser2])
+ expect_equal(userFactors$features[orderUser], userFactors2$features[orderUser2])
+
+ orderItem <- order(itemFactors$id)
+ orderItem2 <- order(itemFactors2$id)
+ expect_equal(itemFactors$id[orderItem], itemFactors2$id[orderItem2])
+ expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2])
+
+ unlink(modelPath)
+ }
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_regression.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_regression.R b/R/pkg/tests/fulltests/test_mllib_regression.R
new file mode 100644
index 0000000..b05fdd3
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_regression.R
@@ -0,0 +1,480 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib regression algorithms, except for tree-based algorithms")
+
+# Tests for MLlib regression algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("formula of spark.glm", {
+ skip_on_cran()
+
+ training <- suppressWarnings(createDataFrame(iris))
+ # directly calling the spark API
+ # dot minus and intercept vs native glm
+ model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # feature interaction vs native glm
+ model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # glm should work with long formula
+ training <- suppressWarnings(createDataFrame(iris))
+ training$LongLongLongLongLongName <- training$Sepal_Width
+ training$VeryLongLongLongLonLongName <- training$Sepal_Length
+ training$AnotherLongLongLongLongName <- training$Species
+ model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName +
+ AnotherLongLongLongLongName)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("spark.glm and predict", {
+ training <- suppressWarnings(createDataFrame(iris))
+ # gaussian family
+ model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # poisson family
+ model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
+ family = poisson(link = identity))
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+ rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
+ data = iris, family = poisson(link = identity)), iris))
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # Gamma family
+ x <- runif(100, -1, 1)
+ y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
+ df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
+ model <- glm(y ~ x, family = Gamma, df)
+ out <- capture.output(print(summary(model)))
+ expect_true(any(grepl("Dispersion parameter for gamma family", out)))
+
+ # tweedie family
+ model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
+ family = "tweedie", var.power = 1.2, link.power = 0.0)
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+
+ # manual calculation of the R predicted values to avoid dependence on statmod
+ #' library(statmod)
+ #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
+ #' family = tweedie(var.power = 1.2, link.power = 0.0))
+ #' print(coef(rModel))
+
+ rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
+ rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
+ data = iris) %*% rCoef))
+ expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
+
+ # Test stats::predict is working
+ x <- rnorm(15)
+ y <- x + rnorm(15)
+ expect_equal(length(predict(lm(y ~ x))), 15)
+})
+
+test_that("spark.glm summary", {
+ # gaussian family
+ training <- suppressWarnings(createDataFrame(iris))
+ stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
+ rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
+
+ # test summary coefficients return matrix type
+ expect_true(class(stats$coefficients) == "matrix")
+ expect_true(class(stats$coefficients[, 1]) == "numeric")
+
+ coefs <- stats$coefficients
+ rCoefs <- rStats$coefficients
+ expect_true(all(abs(rCoefs - coefs) < 1e-4))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
+ expect_equal(stats$dispersion, rStats$dispersion)
+ expect_equal(stats$null.deviance, rStats$null.deviance)
+ expect_equal(stats$deviance, rStats$deviance)
+ expect_equal(stats$df.null, rStats$df.null)
+ expect_equal(stats$df.residual, rStats$df.residual)
+ expect_equal(stats$aic, rStats$aic)
+
+ out <- capture.output(print(stats))
+ expect_match(out[2], "Deviance Residuals:")
+ expect_true(any(grepl("AIC: 59.22", out)))
+
+ # binomial family
+ df <- suppressWarnings(createDataFrame(iris))
+ training <- df[df$Species %in% c("versicolor", "virginica"), ]
+ stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width,
+ family = binomial(link = "logit")))
+
+ rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
+ rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
+ family = binomial(link = "logit")))
+
+ coefs <- stats$coefficients
+ rCoefs <- rStats$coefficients
+ expect_true(all(abs(rCoefs - coefs) < 1e-4))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Sepal_Width")))
+ expect_equal(stats$dispersion, rStats$dispersion)
+ expect_equal(stats$null.deviance, rStats$null.deviance)
+ expect_equal(stats$deviance, rStats$deviance)
+ expect_equal(stats$df.null, rStats$df.null)
+ expect_equal(stats$df.residual, rStats$df.residual)
+ expect_equal(stats$aic, rStats$aic)
+
+ # Test spark.glm works with weighted dataset
+ a1 <- c(0, 1, 2, 3)
+ a2 <- c(5, 2, 1, 3)
+ w <- c(1, 2, 3, 4)
+ b <- c(1, 0, 1, 0)
+ data <- as.data.frame(cbind(a1, a2, w, b))
+ df <- createDataFrame(data)
+
+ stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w"))
+ rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w))
+
+ coefs <- stats$coefficients
+ rCoefs <- rStats$coefficients
+ expect_true(all(abs(rCoefs - coefs) < 1e-3))
+ expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2")))
+ expect_equal(stats$dispersion, rStats$dispersion)
+ expect_equal(stats$null.deviance, rStats$null.deviance)
+ expect_equal(stats$deviance, rStats$deviance)
+ expect_equal(stats$df.null, rStats$df.null)
+ expect_equal(stats$df.residual, rStats$df.residual)
+ expect_equal(stats$aic, rStats$aic)
+
+ # Test summary works on base GLM models
+ baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
+ baseSummary <- summary(baseModel)
+ expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
+
+ # Test spark.glm works with regularization parameter
+ data <- as.data.frame(cbind(a1, a2, b))
+ df <- suppressWarnings(createDataFrame(data))
+ regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
+ expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
+
+ # Test spark.glm works on collinear data
+ A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
+ b <- c(1, 2, 3, 4)
+ data <- as.data.frame(cbind(A, b))
+ df <- createDataFrame(data)
+ stats <- summary(spark.glm(df, b ~ . - 1))
+ coefs <- stats$coefficients
+ expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4))
+})
+
+test_that("spark.glm save/load", {
+ skip_on_cran()
+
+ training <- suppressWarnings(createDataFrame(iris))
+ m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
+ s <- summary(m)
+
+ modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp")
+ write.ml(m, modelPath)
+ expect_error(write.ml(m, modelPath))
+ write.ml(m, modelPath, overwrite = TRUE)
+ m2 <- read.ml(modelPath)
+ s2 <- summary(m2)
+
+ expect_equal(s$coefficients, s2$coefficients)
+ expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
+ expect_equal(s$dispersion, s2$dispersion)
+ expect_equal(s$null.deviance, s2$null.deviance)
+ expect_equal(s$deviance, s2$deviance)
+ expect_equal(s$df.null, s2$df.null)
+ expect_equal(s$df.residual, s2$df.residual)
+ expect_equal(s$aic, s2$aic)
+ expect_equal(s$iter, s2$iter)
+ expect_true(!s$is.loaded)
+ expect_true(s2$is.loaded)
+
+ unlink(modelPath)
+})
+
+test_that("formula of glm", {
+ skip_on_cran()
+
+ training <- suppressWarnings(createDataFrame(iris))
+ # dot minus and intercept vs native glm
+ model <- glm(Sepal_Width ~ . - Species + 0, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # feature interaction vs native glm
+ model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # glm should work with long formula
+ training <- suppressWarnings(createDataFrame(iris))
+ training$LongLongLongLongLongName <- training$Sepal_Width
+ training$VeryLongLongLongLonLongName <- training$Sepal_Length
+ training$AnotherLongLongLongLongName <- training$Species
+ model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
+ data = training)
+ vals <- collect(select(predict(model, training), "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+})
+
+test_that("glm and predict", {
+ skip_on_cran()
+
+ training <- suppressWarnings(createDataFrame(iris))
+ # gaussian family
+ model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+ rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # poisson family
+ model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
+ family = poisson(link = identity))
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+ rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
+ data = iris, family = poisson(link = identity)), iris))
+ expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
+
+ # tweedie family
+ model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
+ family = "tweedie", var.power = 1.2, link.power = 0.0)
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
+ vals <- collect(select(prediction, "prediction"))
+
+ # manual calculation of the R predicted values to avoid dependence on statmod
+ #' library(statmod)
+ #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
+ #' family = tweedie(var.power = 1.2, link.power = 0.0))
+ #' print(coef(rModel))
+
+ rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
+ rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
+ data = iris) %*% rCoef))
+ expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
+
+ # Test stats::predict is working
+ x <- rnorm(15)
+ y <- x + rnorm(15)
+ expect_equal(length(predict(lm(y ~ x))), 15)
+})
+
+test_that("glm summary", {
+ skip_on_cran()
+
+ # gaussian family
+ training <- suppressWarnings(createDataFrame(iris))
+ stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
+
+ rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
+
+ coefs <- stats$coefficients
+ rCoefs <- rStats$coefficients
+ expect_true(all(abs(rCoefs - coefs) < 1e-4))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
+ expect_equal(stats$dispersion, rStats$dispersion)
+ expect_equal(stats$null.deviance, rStats$null.deviance)
+ expect_equal(stats$deviance, rStats$deviance)
+ expect_equal(stats$df.null, rStats$df.null)
+ expect_equal(stats$df.residual, rStats$df.residual)
+ expect_equal(stats$aic, rStats$aic)
+
+ # binomial family
+ df <- suppressWarnings(createDataFrame(iris))
+ training <- df[df$Species %in% c("versicolor", "virginica"), ]
+ stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
+ family = binomial(link = "logit")))
+
+ rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
+ rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
+ family = binomial(link = "logit")))
+
+ coefs <- stats$coefficients
+ rCoefs <- rStats$coefficients
+ expect_true(all(abs(rCoefs - coefs) < 1e-4))
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "Sepal_Length", "Sepal_Width")))
+ expect_equal(stats$dispersion, rStats$dispersion)
+ expect_equal(stats$null.deviance, rStats$null.deviance)
+ expect_equal(stats$deviance, rStats$deviance)
+ expect_equal(stats$df.null, rStats$df.null)
+ expect_equal(stats$df.residual, rStats$df.residual)
+ expect_equal(stats$aic, rStats$aic)
+
+ # Test summary works on base GLM models
+ baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
+ baseSummary <- summary(baseModel)
+ expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
+})
+
+test_that("glm save/load", {
+ skip_on_cran()
+
+ training <- suppressWarnings(createDataFrame(iris))
+ m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
+ s <- summary(m)
+
+ modelPath <- tempfile(pattern = "glm", fileext = ".tmp")
+ write.ml(m, modelPath)
+ expect_error(write.ml(m, modelPath))
+ write.ml(m, modelPath, overwrite = TRUE)
+ m2 <- read.ml(modelPath)
+ s2 <- summary(m2)
+
+ expect_equal(s$coefficients, s2$coefficients)
+ expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
+ expect_equal(s$dispersion, s2$dispersion)
+ expect_equal(s$null.deviance, s2$null.deviance)
+ expect_equal(s$deviance, s2$deviance)
+ expect_equal(s$df.null, s2$df.null)
+ expect_equal(s$df.residual, s2$df.residual)
+ expect_equal(s$aic, s2$aic)
+ expect_equal(s$iter, s2$iter)
+ expect_true(!s$is.loaded)
+ expect_true(s2$is.loaded)
+
+ unlink(modelPath)
+})
+
+test_that("spark.isoreg", {
+ label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
+ feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
+ weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
+ data <- as.data.frame(cbind(label, feature, weight))
+ df <- createDataFrame(data)
+
+ model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
+ weightCol = "weight")
+ # only allow one variable on the right hand side of the formula
+ expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
+ result <- summary(model)
+ expect_equal(result$predictions, list(7, 5, 4, 4, 1))
+
+ # Test model prediction
+ predict_data <- list(list(-2.0), list(-1.0), list(0.5),
+ list(0.75), list(1.0), list(2.0), list(9.0))
+ predict_df <- createDataFrame(predict_data, c("feature"))
+ predict_result <- collect(select(predict(model, predict_df), "prediction"))
+ expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-isoreg", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ expect_equal(result, summary(model2))
+
+ unlink(modelPath)
+ }
+})
+
+test_that("spark.survreg", {
+ # R code to reproduce the result.
+ #
+ #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
+ #' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
+ #' library(survival)
+ #' model <- survreg(Surv(time, status) ~ x + sex, rData)
+ #' summary(model)
+ #' predict(model, data)
+ #
+ # -- output of 'summary(model)'
+ #
+ # Value Std. Error z p
+ # (Intercept) 1.315 0.270 4.88 1.07e-06
+ # x -0.190 0.173 -1.10 2.72e-01
+ # sex -0.253 0.329 -0.77 4.42e-01
+ # Log(scale) -1.160 0.396 -2.93 3.41e-03
+ #
+ # -- output of 'predict(model, data)'
+ #
+ # 1 2 3 4 5 6 7
+ # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
+ #
+ data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
+ list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
+ df <- createDataFrame(data, c("time", "status", "x", "sex"))
+ model <- spark.survreg(df, Surv(time, status) ~ x + sex)
+ stats <- summary(model)
+ coefs <- as.vector(stats$coefficients[, 1])
+ rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
+ expect_equal(coefs, rCoefs, tolerance = 1e-4)
+ expect_true(all(
+ rownames(stats$coefficients) ==
+ c("(Intercept)", "x", "sex", "Log(scale)")))
+ p <- collect(select(predict(model, df), "prediction"))
+ expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
+ 2.390146, 2.891269, 2.891269), tolerance = 1e-4)
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ coefs2 <- as.vector(stats2$coefficients[, 1])
+ expect_equal(coefs, coefs2)
+ expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))
+
+ unlink(modelPath)
+ }
+
+ # Test survival::survreg
+ if (requireNamespace("survival", quietly = TRUE)) {
+ rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
+ x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
+ expect_error(
+ model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
+ NA)
+ expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)
+ }
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_stat.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_stat.R b/R/pkg/tests/fulltests/test_mllib_stat.R
new file mode 100644
index 0000000..1600833
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_stat.R
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib statistics algorithms")
+
+# Tests for MLlib statistics algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("spark.kstest", {
+ data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5))
+ df <- createDataFrame(data)
+ testResult <- spark.kstest(df, "test", "norm")
+ stats <- summary(testResult)
+
+ rStats <- ks.test(data$test, "pnorm", alternative = "two.sided")
+
+ expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
+ expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
+ expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
+
+ testResult <- spark.kstest(df, "test", "norm", -0.5)
+ stats <- summary(testResult)
+
+ rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided")
+
+ expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
+ expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
+ expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
+
+ # Test print.summary.KSTest
+ printStats <- capture.output(print.summary.KSTest(stats))
+ expect_match(printStats[1], "Kolmogorov-Smirnov test summary:")
+ expect_match(printStats[5],
+ "Low presumption against null hypothesis: Sample follows theoretical distribution. ")
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_tree.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_tree.R b/R/pkg/tests/fulltests/test_mllib_tree.R
new file mode 100644
index 0000000..31427ee
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_tree.R
@@ -0,0 +1,320 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib tree-based algorithms")
+
+# Tests for MLlib tree-based algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+absoluteSparkPath <- function(x) {
+ sparkHome <- sparkR.conf("spark.home")
+ file.path(sparkHome, x)
+}
+
+test_that("spark.gbt", {
+ skip_on_cran()
+
+ # regression
+ data <- suppressWarnings(createDataFrame(longley))
+ model <- spark.gbt(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, seed = 123)
+ predictions <- collect(predict(model, data))
+ expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
+ 63.221, 63.639, 64.989, 63.761,
+ 66.019, 67.857, 68.169, 66.513,
+ 68.655, 69.564, 69.331, 70.551),
+ tolerance = 1e-4)
+ stats <- summary(model)
+ expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+ expect_equal(stats$formula, "Employed ~ .")
+ expect_equal(stats$numFeatures, 6)
+ expect_equal(length(stats$treeWeights), 20)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$formula, stats2$formula)
+ expect_equal(stats$numFeatures, stats2$numFeatures)
+ expect_equal(stats$features, stats2$features)
+ expect_equal(stats$featureImportances, stats2$featureImportances)
+ expect_equal(stats$maxDepth, stats2$maxDepth)
+ expect_equal(stats$numTrees, stats2$numTrees)
+ expect_equal(stats$treeWeights, stats2$treeWeights)
+
+ unlink(modelPath)
+ }
+
+ # classification
+ # label must be binary - GBTClassifier currently only supports binary classification.
+ iris2 <- iris[iris$Species != "virginica", ]
+ data <- suppressWarnings(createDataFrame(iris2))
+ model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification")
+ stats <- summary(model)
+ expect_equal(stats$numFeatures, 2)
+ expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+ expect_error(capture.output(stats), NA)
+ expect_true(length(capture.output(stats)) > 6)
+ predictions <- collect(predict(model, data))$prediction
+ # test string prediction values
+ expect_equal(length(grep("setosa", predictions)), 50)
+ expect_equal(length(grep("versicolor", predictions)), 50)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$depth, stats2$depth)
+ expect_equal(stats$numNodes, stats2$numNodes)
+ expect_equal(stats$numClasses, stats2$numClasses)
+
+ unlink(modelPath)
+ }
+
+ iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1)
+ df <- suppressWarnings(createDataFrame(iris2))
+ m <- spark.gbt(df, NumericSpecies ~ ., type = "classification")
+ s <- summary(m)
+ # test numeric prediction values
+ expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
+ expect_equal(s$numFeatures, 5)
+ expect_equal(s$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+
+ # spark.gbt classification can work on libsvm data
+ if (not_cran_or_windows_with_hadoop()) {
+ data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.gbt(data, label ~ features, "classification")
+ expect_equal(summary(model)$numFeatures, 692)
+ }
+})
+
+test_that("spark.randomForest", {
+ # regression
+ data <- suppressWarnings(createDataFrame(longley))
+ model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
+ numTrees = 1)
+
+ predictions <- collect(predict(model, data))
+ expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
+ 63.221, 63.639, 64.989, 63.761,
+ 66.019, 67.857, 68.169, 66.513,
+ 68.655, 69.564, 69.331, 70.551),
+ tolerance = 1e-4)
+
+ stats <- summary(model)
+ expect_equal(stats$numTrees, 1)
+ expect_equal(stats$maxDepth, 5)
+ expect_error(capture.output(stats), NA)
+ expect_true(length(capture.output(stats)) > 6)
+
+ model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
+ numTrees = 20, seed = 123)
+ predictions <- collect(predict(model, data))
+ expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070,
+ 63.53160, 64.05470, 65.12710, 64.30450,
+ 66.70910, 67.86125, 68.08700, 67.21865,
+ 68.89275, 69.53180, 69.39640, 69.68250),
+ tolerance = 1e-4)
+ stats <- summary(model)
+ expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$formula, stats2$formula)
+ expect_equal(stats$numFeatures, stats2$numFeatures)
+ expect_equal(stats$features, stats2$features)
+ expect_equal(stats$featureImportances, stats2$featureImportances)
+ expect_equal(stats$numTrees, stats2$numTrees)
+ expect_equal(stats$maxDepth, stats2$maxDepth)
+ expect_equal(stats$treeWeights, stats2$treeWeights)
+
+ unlink(modelPath)
+ }
+
+ # classification
+ data <- suppressWarnings(createDataFrame(iris))
+ model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification",
+ maxDepth = 5, maxBins = 16)
+
+ stats <- summary(model)
+ expect_equal(stats$numFeatures, 2)
+ expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+ expect_error(capture.output(stats), NA)
+ expect_true(length(capture.output(stats)) > 6)
+ # Test string prediction values
+ predictions <- collect(predict(model, data))$prediction
+ expect_equal(length(grep("setosa", predictions)), 50)
+ expect_equal(length(grep("versicolor", predictions)), 50)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$depth, stats2$depth)
+ expect_equal(stats$numNodes, stats2$numNodes)
+ expect_equal(stats$numClasses, stats2$numClasses)
+
+ unlink(modelPath)
+ }
+
+ # Test numeric response variable
+ labelToIndex <- function(species) {
+ switch(as.character(species),
+ setosa = 0.0,
+ versicolor = 1.0,
+ virginica = 2.0
+ )
+ }
+ iris$NumericSpecies <- lapply(iris$Species, labelToIndex)
+ data <- suppressWarnings(createDataFrame(iris[-5]))
+ model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification",
+ maxDepth = 5, maxBins = 16)
+ stats <- summary(model)
+ expect_equal(stats$numFeatures, 2)
+ expect_equal(stats$numTrees, 20)
+ expect_equal(stats$maxDepth, 5)
+
+ # Test numeric prediction values
+ predictions <- collect(predict(model, data))$prediction
+ expect_equal(length(grep("1.0", predictions)), 50)
+ expect_equal(length(grep("2.0", predictions)), 50)
+
+ # spark.randomForest classification can work on libsvm data
+ if (not_cran_or_windows_with_hadoop()) {
+ data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.randomForest(data, label ~ features, "classification")
+ expect_equal(summary(model)$numFeatures, 4)
+ }
+})
+
+test_that("spark.decisionTree", {
+ skip_on_cran()
+
+ # regression
+ data <- suppressWarnings(createDataFrame(longley))
+ model <- spark.decisionTree(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16)
+
+ predictions <- collect(predict(model, data))
+ expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
+ 63.221, 63.639, 64.989, 63.761,
+ 66.019, 67.857, 68.169, 66.513,
+ 68.655, 69.564, 69.331, 70.551),
+ tolerance = 1e-4)
+
+ stats <- summary(model)
+ expect_equal(stats$maxDepth, 5)
+ expect_error(capture.output(stats), NA)
+ expect_true(length(capture.output(stats)) > 6)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-decisionTreeRegression", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$formula, stats2$formula)
+ expect_equal(stats$numFeatures, stats2$numFeatures)
+ expect_equal(stats$features, stats2$features)
+ expect_equal(stats$featureImportances, stats2$featureImportances)
+ expect_equal(stats$maxDepth, stats2$maxDepth)
+
+ unlink(modelPath)
+ }
+
+ # classification
+ data <- suppressWarnings(createDataFrame(iris))
+ model <- spark.decisionTree(data, Species ~ Petal_Length + Petal_Width, "classification",
+ maxDepth = 5, maxBins = 16)
+
+ stats <- summary(model)
+ expect_equal(stats$numFeatures, 2)
+ expect_equal(stats$maxDepth, 5)
+ expect_error(capture.output(stats), NA)
+ expect_true(length(capture.output(stats)) > 6)
+ # Test string prediction values
+ predictions <- collect(predict(model, data))$prediction
+ expect_equal(length(grep("setosa", predictions)), 50)
+ expect_equal(length(grep("versicolor", predictions)), 50)
+
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-decisionTreeClassification", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$depth, stats2$depth)
+ expect_equal(stats$numNodes, stats2$numNodes)
+ expect_equal(stats$numClasses, stats2$numClasses)
+
+ unlink(modelPath)
+ }
+
+ # Test numeric response variable
+ labelToIndex <- function(species) {
+ switch(as.character(species),
+ setosa = 0.0,
+ versicolor = 1.0,
+ virginica = 2.0
+ )
+ }
+ iris$NumericSpecies <- lapply(iris$Species, labelToIndex)
+ data <- suppressWarnings(createDataFrame(iris[-5]))
+ model <- spark.decisionTree(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification",
+ maxDepth = 5, maxBins = 16)
+ stats <- summary(model)
+ expect_equal(stats$numFeatures, 2)
+ expect_equal(stats$maxDepth, 5)
+
+ # Test numeric prediction values
+ predictions <- collect(predict(model, data))$prediction
+ expect_equal(length(grep("1.0", predictions)), 50)
+ expect_equal(length(grep("2.0", predictions)), 50)
+
+ # spark.decisionTree classification can work on libsvm data
+ if (not_cran_or_windows_with_hadoop()) {
+ data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.decisionTree(data, label ~ features, "classification")
+ expect_equal(summary(model)$numFeatures, 4)
+ }
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_parallelize_collect.R b/R/pkg/tests/fulltests/test_parallelize_collect.R
new file mode 100644
index 0000000..52d4c93
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_parallelize_collect.R
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("parallelize() and collect()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+ "violated, but I'm not. No, in fact, I think this is a friendly",
+ "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+ "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+ "other times it helps me control the chaos.",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+ "raising me. But they're both dead now. I didn't kill them. Honest.")
+
+numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
+strPairs <- list(list(strList, strList), list(strList, strList))
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+jsc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Tests
+
+test_that("parallelize() on simple vectors and lists returns an RDD", {
+ skip_on_cran()
+
+ numVectorRDD <- parallelize(jsc, numVector, 1)
+ numVectorRDD2 <- parallelize(jsc, numVector, 10)
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+
+ rdds <- c(numVectorRDD,
+ numVectorRDD2,
+ numListRDD,
+ numListRDD2,
+ strVectorRDD,
+ strVectorRDD2,
+ strListRDD,
+ strListRDD2)
+
+ for (rdd in rdds) {
+ expect_is(rdd, "RDD")
+ expect_true(.hasSlot(rdd, "jrdd")
+ && inherits(rdd@jrdd, "jobj")
+ && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
+ }
+})
+
+test_that("collect(), following a parallelize(), gives back the original collections", {
+ skip_on_cran()
+
+ numVectorRDD <- parallelize(jsc, numVector, 10)
+ expect_equal(collectRDD(numVectorRDD), as.list(numVector))
+
+ numListRDD <- parallelize(jsc, numList, 1)
+ numListRDD2 <- parallelize(jsc, numList, 4)
+ expect_equal(collectRDD(numListRDD), as.list(numList))
+ expect_equal(collectRDD(numListRDD2), as.list(numList))
+
+ strVectorRDD <- parallelize(jsc, strVector, 2)
+ strVectorRDD2 <- parallelize(jsc, strVector, 3)
+ expect_equal(collectRDD(strVectorRDD), as.list(strVector))
+ expect_equal(collectRDD(strVectorRDD2), as.list(strVector))
+
+ strListRDD <- parallelize(jsc, strList, 4)
+ strListRDD2 <- parallelize(jsc, strList, 1)
+ expect_equal(collectRDD(strListRDD), as.list(strList))
+ expect_equal(collectRDD(strListRDD2), as.list(strList))
+})
+
+test_that("regression: collect() following a parallelize() does not drop elements", {
+ skip_on_cran()
+
+ # 10 %/% 6 = 1, ceiling(10 / 6) = 2
+ collLen <- 10
+ numPart <- 6
+ expected <- runif(collLen)
+ actual <- collectRDD(parallelize(jsc, expected, numPart))
+ expect_equal(actual, as.list(expected))
+})
+
+test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
+ skip_on_cran()
+
+ # use the pairwise logical to indicate pairwise data
+ numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
+ numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
+ numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
+ expect_equal(collectRDD(numPairsRDDD1), numPairs)
+ expect_equal(collectRDD(numPairsRDDD2), numPairs)
+ expect_equal(collectRDD(numPairsRDDD3), numPairs)
+ # can also leave out the parameter name, if the params are supplied in order
+ strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
+ strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
+ expect_equal(collectRDD(strPairsRDDD1), strPairs)
+ expect_equal(collectRDD(strPairsRDDD2), strPairs)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_rdd.R b/R/pkg/tests/fulltests/test_rdd.R
new file mode 100644
index 0000000..fb244e1
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_rdd.R
@@ -0,0 +1,906 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("basic RDD functions")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+test_that("get number of partitions in RDD", {
+ skip_on_cran()
+
+ expect_equal(getNumPartitionsRDD(rdd), 2)
+ expect_equal(getNumPartitionsRDD(intRdd), 2)
+})
+
+test_that("first on RDD", {
+ skip_on_cran()
+
+ expect_equal(firstRDD(rdd), 1)
+ newrdd <- lapply(rdd, function(x) x + 1)
+ expect_equal(firstRDD(newrdd), 2)
+})
+
+test_that("count and length on RDD", {
+ skip_on_cran()
+
+ expect_equal(countRDD(rdd), 10)
+ expect_equal(lengthRDD(rdd), 10)
+})
+
+test_that("count by values and keys", {
+ skip_on_cran()
+
+ mods <- lapply(rdd, function(x) { x %% 3 })
+ actual <- countByValue(mods)
+ expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ actual <- countByKey(intRdd)
+ expected <- list(list(2L, 2L), list(1L, 2L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("lapply on RDD", {
+ skip_on_cran()
+
+ multiples <- lapply(rdd, function(x) { 2 * x })
+ actual <- collectRDD(multiples)
+ expect_equal(actual, as.list(nums * 2))
+})
+
+test_that("lapplyPartition on RDD", {
+ skip_on_cran()
+
+ sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
+ actual <- collectRDD(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("mapPartitions on RDD", {
+ skip_on_cran()
+
+ sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
+ actual <- collectRDD(sums)
+ expect_equal(actual, list(15, 40))
+})
+
+test_that("flatMap() on RDDs", {
+ skip_on_cran()
+
+ flat <- flatMap(intRdd, function(x) { list(x, x) })
+ actual <- collectRDD(flat)
+ expect_equal(actual, rep(intPairs, each = 2))
+})
+
+test_that("filterRDD on RDD", {
+ skip_on_cran()
+
+ filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
+ actual <- collectRDD(filtered.rdd)
+ expect_equal(actual, list(2, 4, 6, 8, 10))
+
+ filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
+ actual <- collectRDD(filtered.rdd)
+ expect_equal(actual, list(list(1L, -1)))
+
+ # Filter out all elements.
+ filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
+ actual <- collectRDD(filtered.rdd)
+ expect_equal(actual, list())
+})
+
+test_that("lookup on RDD", {
+ skip_on_cran()
+
+ vals <- lookup(intRdd, 1L)
+ expect_equal(vals, list(-1, 200))
+
+ vals <- lookup(intRdd, 3L)
+ expect_equal(vals, list())
+})
+
+test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
+ skip_on_cran()
+
+ rdd2 <- rdd
+ for (i in 1:12)
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2, function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex + i)
+ })
+ rdd2 <- lapply(rdd2, function(x) x + x)
+ actual <- collectRDD(rdd2)
+ expected <- list(24, 24, 24, 24, 24,
+ 168, 170, 172, 174, 176)
+ expect_equal(actual, expected)
+})
+
+test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
+ skip_on_cran()
+
+ # RDD
+ rdd2 <- rdd
+ # PipelinedRDD
+ rdd2 <- lapplyPartitionsWithIndex(
+ rdd2,
+ function(partIndex, part) {
+ part <- as.list(unlist(part) * partIndex)
+ })
+
+ cacheRDD(rdd2)
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersistRDD(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ persistRDD(rdd2, "MEMORY_AND_DISK")
+ expect_true(rdd2@env$isCached)
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+
+ unpersistRDD(rdd2)
+ expect_false(rdd2@env$isCached)
+
+ tempDir <- tempfile(pattern = "checkpoint")
+ setCheckpointDirSC(sc, tempDir)
+ checkpointRDD(rdd2)
+ expect_true(rdd2@env$isCheckpointed)
+
+ rdd2 <- lapply(rdd2, function(x) x)
+ expect_false(rdd2@env$isCached)
+ expect_false(rdd2@env$isCheckpointed)
+
+ # make sure the data is collectable
+ collectRDD(rdd2)
+
+ unlink(tempDir)
+})
+
+test_that("reduce on RDD", {
+ skip_on_cran()
+
+ sum <- reduce(rdd, "+")
+ expect_equal(sum, 55)
+
+ # Also test with an inline function
+ sumInline <- reduce(rdd, function(x, y) { x + y })
+ expect_equal(sumInline, 55)
+})
+
+test_that("lapply with dependency", {
+ skip_on_cran()
+
+ fa <- 5
+ multiples <- lapply(rdd, function(x) { fa * x })
+ actual <- collectRDD(multiples)
+
+ expect_equal(actual, as.list(nums * 5))
+})
+
+test_that("lapplyPartitionsWithIndex on RDDs", {
+ skip_on_cran()
+
+ func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
+ actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
+ expect_equal(actual, list(list(0, 15), list(1, 40)))
+
+ pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
+ partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
+ mkTup <- function(partIndex, part) { list(partIndex, part) }
+ actual <- collectRDD(lapplyPartitionsWithIndex(
+ partitionByRDD(pairsRDD, 2L, partitionByParity),
+ mkTup),
+ FALSE)
+ expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
+ list(1, list(list(4, 8)))))
+})
+
+test_that("sampleRDD() on RDDs", {
+ skip_on_cran()
+
+ expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
+})
+
+test_that("takeSample() on RDDs", {
+ skip_on_cran()
+
+ # ported from RDDSuite.scala, modified seeds
+ data <- parallelize(sc, 1:100, 2L)
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 20L, seed)
+ expect_equal(length(s), 20L)
+ expect_equal(length(unique(s)), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, FALSE, 200L, seed)
+ expect_equal(length(s), 100L)
+ expect_equal(length(unique(s)), 100L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 20L, seed)
+ expect_equal(length(s), 20L)
+ for (elem in s) {
+ expect_true(elem >= 1 && elem <= 100)
+ }
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 100L, seed)
+ expect_equal(length(s), 100L)
+ # Chance of getting all distinct elements is astronomically low, so test we
+ # got less than 100
+ expect_true(length(unique(s)) < 100L)
+ }
+ for (seed in 4:5) {
+ s <- takeSample(data, TRUE, 200L, seed)
+ expect_equal(length(s), 200L)
+ # Chance of getting all distinct elements is still quite low, so test we
+ # got less than 100
+ expect_true(length(unique(s)) < 100L)
+ }
+})
+
+test_that("mapValues() on pairwise RDDs", {
+ skip_on_cran()
+
+ multiples <- mapValues(intRdd, function(x) { x * 2 })
+ actual <- collectRDD(multiples)
+ expected <- lapply(intPairs, function(x) {
+ list(x[[1]], x[[2]] * 2)
+ })
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("flatMapValues() on pairwise RDDs", {
+ skip_on_cran()
+
+ l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
+ actual <- collectRDD(flatMapValues(l, function(x) { x }))
+ expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+
+ # Generate x to x+1 for every value
+ actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
+ expect_equal(actual,
+ list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
+ list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
+})
+
+test_that("reduceByKeyLocally() on PairwiseRDDs", {
+ skip_on_cran()
+
+ pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, 6), list(1.1, 3))))
+
+ pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
+ list("bb", 5)), 4L)
+ actual <- reduceByKeyLocally(pairs, "+")
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
+})
+
+test_that("distinct() on RDDs", {
+ skip_on_cran()
+
+ nums.rep2 <- rep(1:10, 2)
+ rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
+ uniques <- distinctRDD(rdd.rep2)
+ actual <- sort(unlist(collectRDD(uniques)))
+ expect_equal(actual, nums)
+})
+
+test_that("maximum() on RDDs", {
+ skip_on_cran()
+
+ max <- maximum(rdd)
+ expect_equal(max, 10)
+})
+
+test_that("minimum() on RDDs", {
+ skip_on_cran()
+
+ min <- minimum(rdd)
+ expect_equal(min, 1)
+})
+
+test_that("sumRDD() on RDDs", {
+ skip_on_cran()
+
+ sum <- sumRDD(rdd)
+ expect_equal(sum, 55)
+})
+
+test_that("keyBy on RDDs", {
+ skip_on_cran()
+
+ func <- function(x) { x * x }
+ keys <- keyBy(rdd, func)
+ actual <- collectRDD(keys)
+ expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
+})
+
+test_that("repartition/coalesce on RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
+
+ # repartition
+ r1 <- repartitionRDD(rdd, 2)
+ expect_equal(getNumPartitionsRDD(r1), 2L)
+ count <- length(collectPartition(r1, 0L))
+ expect_true(count >= 8 && count <= 12)
+
+ r2 <- repartitionRDD(rdd, 6)
+ expect_equal(getNumPartitionsRDD(r2), 6L)
+ count <- length(collectPartition(r2, 0L))
+ expect_true(count >= 0 && count <= 4)
+
+ # coalesce
+ r3 <- coalesceRDD(rdd, 1)
+ expect_equal(getNumPartitionsRDD(r3), 1L)
+ count <- length(collectPartition(r3, 0L))
+ expect_equal(count, 20)
+})
+
+test_that("sortBy() on RDDs", {
+ skip_on_cran()
+
+ sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
+ actual <- collectRDD(sortedRdd)
+ expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
+ actual <- collectRDD(sortedRdd2)
+ expect_equal(actual, as.list(nums))
+})
+
+test_that("takeOrdered() on RDDs", {
+ skip_on_cran()
+
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- takeOrdered(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l)))[1:3])
+})
+
+test_that("top() on RDDs", {
+ skip_on_cran()
+
+ l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 6L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
+
+ l <- list("e", "d", "c", "d", "a")
+ rdd <- parallelize(sc, l)
+ actual <- top(rdd, 3L)
+ expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
+})
+
+test_that("fold() on RDDs", {
+ skip_on_cran()
+
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, Reduce("+", nums, 0))
+
+ rdd <- parallelize(sc, list())
+ actual <- fold(rdd, 0, "+")
+ expect_equal(actual, 0)
+})
+
+test_that("aggregateRDD() on RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, list(1, 2, 3, 4))
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(10, 4))
+
+ rdd <- parallelize(sc, list())
+ actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
+ expect_equal(actual, list(0, 0))
+})
+
+test_that("zipWithUniqueId() on RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collectRDD(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 4),
+ list("d", 2), list("e", 5))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collectRDD(zipWithUniqueId(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("zipWithIndex() on RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
+ actual <- collectRDD(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+
+ rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
+ actual <- collectRDD(zipWithIndex(rdd))
+ expected <- list(list("a", 0), list("b", 1), list("c", 2),
+ list("d", 3), list("e", 4))
+ expect_equal(actual, expected)
+})
+
+test_that("glom() on RDD", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, as.list(1:4), 2L)
+ actual <- collectRDD(glom(rdd))
+ expect_equal(actual, list(list(1, 2), list(3, 4)))
+})
+
+test_that("keys() on RDDs", {
+ skip_on_cran()
+
+ keys <- keys(intRdd)
+ actual <- collectRDD(keys)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
+})
+
+test_that("values() on RDDs", {
+ skip_on_cran()
+
+ values <- values(intRdd)
+ actual <- collectRDD(values)
+ expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
+})
+
+test_that("pipeRDD() on RDDs", {
+ skip_on_cran()
+
+ actual <- collectRDD(pipeRDD(rdd, "more"))
+ expected <- as.list(as.character(1:10))
+ expect_equal(actual, expected)
+
+ trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
+ actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
+ expected <- list("", "1", "2", "3")
+ expect_equal(actual, expected)
+
+ rev.nums <- 9:0
+ rev.rdd <- parallelize(sc, rev.nums, 2L)
+ actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
+ expected <- as.list(as.character(c(5:9, 0:4)))
+ expect_equal(actual, expected)
+})
+
+test_that("zipRDD() on RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, 0:4, 2)
+ rdd2 <- parallelize(sc, 1000:1004, 2)
+ actual <- collectRDD(zipRDD(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collectRDD(zipRDD(rdd, rdd))
+ expected <- lapply(mockFile, function(x) { list(x, x) })
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collectRDD(zipRDD(rdd1, rdd))
+ expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
+ expect_equal(actual, expected)
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collectRDD(zipRDD(rdd, rdd1))
+ expected <- lapply(mockFile, function(x) { list(x, x) })
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})
+
+test_that("cartesian() on RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, 1:3)
+ actual <- collectRDD(cartesian(rdd, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(1, 1), list(1, 2), list(1, 3),
+ list(2, 1), list(2, 2), list(2, 3),
+ list(3, 1), list(3, 2), list(3, 3)))
+
+ # test case where one RDD is empty
+ emptyRdd <- parallelize(sc, list())
+ actual <- collectRDD(cartesian(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName)
+ actual <- collectRDD(cartesian(rdd, rdd))
+ expected <- list(
+ list("Spark is awesome.", "Spark is pretty."),
+ list("Spark is awesome.", "Spark is awesome."),
+ list("Spark is pretty.", "Spark is pretty."),
+ list("Spark is pretty.", "Spark is awesome."))
+ expect_equal(sortKeyValueList(actual), expected)
+
+ rdd1 <- parallelize(sc, 0:1)
+ actual <- collectRDD(cartesian(rdd1, rdd))
+ expect_equal(sortKeyValueList(actual),
+ list(
+ list(0, "Spark is pretty."),
+ list(0, "Spark is awesome."),
+ list(1, "Spark is pretty."),
+ list(1, "Spark is awesome.")))
+
+ rdd1 <- map(rdd, function(x) { x })
+ actual <- collectRDD(cartesian(rdd, rdd1))
+ expect_equal(sortKeyValueList(actual), expected)
+
+ unlink(fileName)
+})
+
+test_that("subtract() on RDDs", {
+ skip_on_cran()
+
+ l <- list(1, 1, 2, 2, 3, 4)
+ rdd1 <- parallelize(sc, l)
+
+ # subtract by itself
+ actual <- collectRDD(subtract(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtract by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collectRDD(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
+ l)
+
+ rdd2 <- parallelize(sc, list(2, 4))
+ actual <- collectRDD(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
+ list(1, 1, 3))
+
+ l <- list("a", "a", "b", "b", "c", "d")
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list("b", "d"))
+ actual <- collectRDD(subtract(rdd1, rdd2))
+ expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
+ list("a", "a", "c"))
+})
+
+test_that("subtractByKey() on pairwise RDDs", {
+ skip_on_cran()
+
+ l <- list(list("a", 1), list("b", 4),
+ list("b", 5), list("a", 2))
+ rdd1 <- parallelize(sc, l)
+
+ # subtractByKey by itself
+ actual <- collectRDD(subtractByKey(rdd1, rdd1))
+ expect_equal(actual, list())
+
+ # subtractByKey by an empty RDD
+ rdd2 <- parallelize(sc, list())
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(l))
+
+ rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list("b", 4), list("b", 5)))
+
+ l <- list(list(1, 1), list(2, 4),
+ list(2, 5), list(1, 2))
+ rdd1 <- parallelize(sc, l)
+ rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
+ actual <- collectRDD(subtractByKey(rdd1, rdd2))
+ expect_equal(actual,
+ list(list(2, 4), list(2, 5)))
+})
+
+test_that("intersection() on RDDs", {
+ skip_on_cran()
+
+ # intersection with self
+ actual <- collectRDD(intersection(rdd, rdd))
+ expect_equal(sort(as.integer(actual)), nums)
+
+ # intersection with an empty RDD
+ emptyRdd <- parallelize(sc, list())
+ actual <- collectRDD(intersection(rdd, emptyRdd))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
+ rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
+ actual <- collectRDD(intersection(rdd1, rdd2))
+ expect_equal(sort(as.integer(actual)), 1:3)
+})
+
+test_that("join() on pairwise RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
+ rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
+ rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
+ actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
+ expect_equal(actual, list())
+})
+
+test_that("leftOuterJoin() on pairwise RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
+ rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
+ rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
+ actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
+
+test_that("rightOuterJoin() on pairwise RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
+ rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
+ rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
+ actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("fullOuterJoin() on pairwise RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
+ rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
+ list(2, list(NULL, 4)), list(3, list(3, NULL)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
+ rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
+ expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
+ list("a", list(3, 1)), list("c", list(1, NULL)))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
+ rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
+ list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
+ actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
+ list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
+})
+
+test_that("sortByKey() on pairwise RDDs", {
+ skip_on_cran()
+
+ numPairsRdd <- map(rdd, function(x) { list (x, x) })
+ sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
+ actual <- collectRDD(sortedRdd)
+ numPairs <- lapply(nums, function(x) { list (x, x) })
+ expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
+
+ rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
+ numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
+ sortedRdd2 <- sortByKey(numPairsRdd2)
+ actual <- collectRDD(sortedRdd2)
+ expect_equal(actual, numPairs)
+
+ # sort by string keys
+ l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
+ rdd3 <- parallelize(sc, l, 2L)
+ sortedRdd3 <- sortByKey(rdd3)
+ actual <- collectRDD(sortedRdd3)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # test on the boundary cases
+
+ # boundary case 1: the RDD to be sorted has only 1 partition
+ rdd4 <- parallelize(sc, l, 1L)
+ sortedRdd4 <- sortByKey(rdd4)
+ actual <- collectRDD(sortedRdd4)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 2: the sorted RDD has only 1 partition
+ rdd5 <- parallelize(sc, l, 2L)
+ sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
+ actual <- collectRDD(sortedRdd5)
+ expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
+
+ # boundary case 3: the RDD to be sorted has only 1 element
+ l2 <- list(list("a", 1))
+ rdd6 <- parallelize(sc, l2, 2L)
+ sortedRdd6 <- sortByKey(rdd6)
+ actual <- collectRDD(sortedRdd6)
+ expect_equal(actual, l2)
+
+ # boundary case 4: the RDD to be sorted has 0 element
+ l3 <- list()
+ rdd7 <- parallelize(sc, l3, 2L)
+ sortedRdd7 <- sortByKey(rdd7)
+ actual <- collectRDD(sortedRdd7)
+ expect_equal(actual, l3)
+})
+
+test_that("collectAsMap() on a pairwise RDD", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = 2, `3` = 4))
+
+ rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(a = 1, b = 2))
+
+ rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
+
+ rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
+ vals <- collectAsMap(rdd)
+ expect_equal(vals, list(`1` = "a", `2` = "b"))
+})
+
+test_that("show()", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, list(1:10))
+ expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
+})
+
+test_that("sampleByKey() on pairwise RDDs", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
+ fractions <- list(a = 0.2, b = 0.1)
+ sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
+ expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
+ expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
+ expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
+ expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
+
+ rdd <- parallelize(sc, 1:2000)
+ pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
+ fractions <- list(`2` = 0.2, `3` = 0.1)
+ sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
+ expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
+ expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
+ expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
+ expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
+ expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
+ expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
+})
+
+test_that("Test correct concurrency of RRDD.compute()", {
+ skip_on_cran()
+
+ rdd <- parallelize(sc, 1:1000, 100)
+ jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row")
+ zrdd <- callJMethod(jrdd, "zip", jrdd)
+ count <- callJMethod(zrdd, "count")
+ expect_equal(count, 1000)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_shuffle.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_shuffle.R b/R/pkg/tests/fulltests/test_shuffle.R
new file mode 100644
index 0000000..18320ea
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_shuffle.R
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("partitionBy, groupByKey, reduceByKey etc.")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Data
+intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
+intRdd <- parallelize(sc, intPairs, 2L)
+
+doublePairs <- list(list(1.5, -1), list(2.5, 100), list(2.5, 1), list(1.5, 200))
+doubleRdd <- parallelize(sc, doublePairs, 2L)
+
+numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1),
+ list(3L, 0))
+numPairsRdd <- parallelize(sc, numPairs, length(numPairs))
+
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ",
+ "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ")
+strListRDD <- parallelize(sc, strList, 4)
+
+test_that("groupByKey for integers", {
+ skip_on_cran()
+
+ grouped <- groupByKey(intRdd, 2L)
+
+ actual <- collectRDD(grouped)
+
+ expected <- list(list(2L, list(100, 1)), list(1L, list(-1, 200)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("groupByKey for doubles", {
+ skip_on_cran()
+
+ grouped <- groupByKey(doubleRdd, 2L)
+
+ actual <- collectRDD(grouped)
+
+ expected <- list(list(1.5, list(-1, 200)), list(2.5, list(100, 1)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for ints", {
+ skip_on_cran()
+
+ reduced <- reduceByKey(intRdd, "+", 2L)
+
+ actual <- collectRDD(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("reduceByKey for doubles", {
+ skip_on_cran()
+
+ reduced <- reduceByKey(doubleRdd, "+", 2L)
+ actual <- collectRDD(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for ints", {
+ skip_on_cran()
+
+ reduced <- combineByKey(intRdd, function(x) { x }, "+", "+", 2L)
+
+ actual <- collectRDD(reduced)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for doubles", {
+ skip_on_cran()
+
+ reduced <- combineByKey(doubleRdd, function(x) { x }, "+", "+", 2L)
+ actual <- collectRDD(reduced)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("combineByKey for characters", {
+ skip_on_cran()
+
+ stringKeyRDD <- parallelize(sc,
+ list(list("max", 1L), list("min", 2L),
+ list("other", 3L), list("max", 4L)), 2L)
+ reduced <- combineByKey(stringKeyRDD,
+ function(x) { x }, "+", "+", 2L)
+ actual <- collectRDD(reduced)
+
+ expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("aggregateByKey", {
+ skip_on_cran()
+
+ # test aggregateByKey for int keys
+ rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collectRDD(aggregatedRDD)
+
+ expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test aggregateByKey for string keys
+ rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
+
+ zeroValue <- list(0, 0)
+ seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
+ combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
+ aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
+
+ actual <- collectRDD(aggregatedRDD)
+
+ expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+})
+
+test_that("foldByKey", {
+ skip_on_cran()
+
+ # test foldByKey for int keys
+ folded <- foldByKey(intRdd, 0, "+", 2L)
+
+ actual <- collectRDD(folded)
+
+ expected <- list(list(2L, 101), list(1L, 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for double keys
+ folded <- foldByKey(doubleRdd, 0, "+", 2L)
+
+ actual <- collectRDD(folded)
+
+ expected <- list(list(1.5, 199), list(2.5, 101))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for string keys
+ stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
+
+ stringKeyRDD <- parallelize(sc, stringKeyPairs)
+ folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
+
+ actual <- collectRDD(folded)
+
+ expected <- list(list("b", 101), list("a", 199))
+ expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
+
+ # test foldByKey for empty pair RDD
+ rdd <- parallelize(sc, list())
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collectRDD(folded)
+ expected <- list()
+ expect_equal(actual, expected)
+
+ # test foldByKey for RDD with only 1 pair
+ rdd <- parallelize(sc, list(list(1, 1)))
+ folded <- foldByKey(rdd, 0, "+", 2L)
+ actual <- collectRDD(folded)
+ expected <- list(list(1, 1))
+ expect_equal(actual, expected)
+})
+
+test_that("partitionBy() partitions data correctly", {
+ skip_on_cran()
+
+ # Partition by magnitude
+ partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 }
+
+ resultRDD <- partitionByRDD(numPairsRdd, 2L, partitionByMagnitude)
+
+ expected_first <- list(list(1, 100), list(2, 200)) # key less than 3
+ expected_second <- list(list(4, -1), list(3, 1), list(3, 0)) # key greater than or equal 3
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("partitionBy works with dependencies", {
+ skip_on_cran()
+
+ kOne <- 1
+ partitionByParity <- function(key) { if (key %% 2 == kOne) 7 else 4 }
+
+ # Partition by parity
+ resultRDD <- partitionByRDD(numPairsRdd, numPartitions = 2L, partitionByParity)
+
+ # keys even; 100 %% 2 == 0
+ expected_first <- list(list(2, 200), list(4, -1))
+ # keys odd; 3 %% 2 == 1
+ expected_second <- list(list(1, 100), list(3, 1), list(3, 0))
+ actual_first <- collectPartition(resultRDD, 0L)
+ actual_second <- collectPartition(resultRDD, 1L)
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+test_that("test partitionBy with string keys", {
+ skip_on_cran()
+
+ words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ resultRDD <- partitionByRDD(wordCount, 2L)
+ expected_first <- list(list("Dexter", 1), list("Dexter", 1))
+ expected_second <- list(list("and", 1), list("and", 1))
+
+ actual_first <- Filter(function(item) { item[[1]] == "Dexter" },
+ collectPartition(resultRDD, 0L))
+ actual_second <- Filter(function(item) { item[[1]] == "and" },
+ collectPartition(resultRDD, 1L))
+
+ expect_equal(sortKeyValueList(actual_first), sortKeyValueList(expected_first))
+ expect_equal(sortKeyValueList(actual_second), sortKeyValueList(expected_second))
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkR.R b/R/pkg/tests/fulltests/test_sparkR.R
new file mode 100644
index 0000000..a40981c
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_sparkR.R
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in sparkR.R")
+
+test_that("sparkCheckInstall", {
+ skip_on_cran()
+
+ # "local, yarn-client, mesos-client" mode, SPARK_HOME was set correctly,
+ # and the SparkR job was submitted by "spark-submit"
+ sparkHome <- paste0(tempdir(), "/", "sparkHome")
+ dir.create(sparkHome)
+ master <- ""
+ deployMode <- ""
+ expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode)))
+ unlink(sparkHome, recursive = TRUE)
+
+ # "yarn-cluster, mesos-cluster" mode, SPARK_HOME was not set,
+ # and the SparkR job was submitted by "spark-submit"
+ sparkHome <- ""
+ master <- ""
+ deployMode <- ""
+ expect_true(is.null(sparkCheckInstall(sparkHome, master, deployMode)))
+
+ # "yarn-client, mesos-client" mode, SPARK_HOME was not set
+ sparkHome <- ""
+ master <- "yarn-client"
+ deployMode <- ""
+ expect_error(sparkCheckInstall(sparkHome, master, deployMode))
+ sparkHome <- ""
+ master <- ""
+ deployMode <- "client"
+ expect_error(sparkCheckInstall(sparkHome, master, deployMode))
+})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org