You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/09/10 17:24:58 UTC
[8/9] spark git commit: [SPARKR][BACKPORT-2.1] backporting package
and test changes
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R
deleted file mode 100644
index 8fe3a87..0000000
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ /dev/null
@@ -1,1205 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib functions")
-
-# Tests for MLlib functions in SparkR
-sparkSession <- sparkR.session(enableHiveSupport = FALSE)
-
-absoluteSparkPath <- function(x) {
- sparkHome <- sparkR.conf("spark.home")
- file.path(sparkHome, x)
-}
-
-test_that("formula of spark.glm", {
- training <- suppressWarnings(createDataFrame(iris))
- # directly calling the spark API
- # dot minus and intercept vs native glm
- model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # feature interaction vs native glm
- model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # glm should work with long formula
- training <- suppressWarnings(createDataFrame(iris))
- training$LongLongLongLongLongName <- training$Sepal_Width
- training$VeryLongLongLongLonLongName <- training$Sepal_Length
- training$AnotherLongLongLongLongName <- training$Species
- model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName +
- AnotherLongLongLongLongName)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("spark.glm and predict", {
- training <- suppressWarnings(createDataFrame(iris))
- # gaussian family
- model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
- prediction <- predict(model, training)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
- vals <- collect(select(prediction, "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # poisson family
- model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
- family = poisson(link = identity))
- prediction <- predict(model, training)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
- vals <- collect(select(prediction, "prediction"))
- rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
- data = iris, family = poisson(link = identity)), iris))
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # Gamma family
- x <- runif(100, -1, 1)
- y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
- df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
- model <- glm(y ~ x, family = Gamma, df)
- out <- capture.output(print(summary(model)))
- expect_true(any(grepl("Dispersion parameter for gamma family", out)))
-
- # Test stats::predict is working
- x <- rnorm(15)
- y <- x + rnorm(15)
- expect_equal(length(predict(lm(y ~ x))), 15)
-})
-
-test_that("spark.glm summary", {
- # gaussian family
- training <- suppressWarnings(createDataFrame(iris))
- stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
-
- rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
-
- coefs <- unlist(stats$coefficients)
- rCoefs <- unlist(rStats$coefficients)
- expect_true(all(abs(rCoefs - coefs) < 1e-4))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
- expect_equal(stats$dispersion, rStats$dispersion)
- expect_equal(stats$null.deviance, rStats$null.deviance)
- expect_equal(stats$deviance, rStats$deviance)
- expect_equal(stats$df.null, rStats$df.null)
- expect_equal(stats$df.residual, rStats$df.residual)
- expect_equal(stats$aic, rStats$aic)
-
- out <- capture.output(print(stats))
- expect_match(out[2], "Deviance Residuals:")
- expect_true(any(grepl("AIC: 59.22", out)))
-
- # binomial family
- df <- suppressWarnings(createDataFrame(iris))
- training <- df[df$Species %in% c("versicolor", "virginica"), ]
- stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width,
- family = binomial(link = "logit")))
-
- rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
- rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
- family = binomial(link = "logit")))
-
- coefs <- unlist(stats$coefficients)
- rCoefs <- unlist(rStats$coefficients)
- expect_true(all(abs(rCoefs - coefs) < 1e-4))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Sepal_Width")))
- expect_equal(stats$dispersion, rStats$dispersion)
- expect_equal(stats$null.deviance, rStats$null.deviance)
- expect_equal(stats$deviance, rStats$deviance)
- expect_equal(stats$df.null, rStats$df.null)
- expect_equal(stats$df.residual, rStats$df.residual)
- expect_equal(stats$aic, rStats$aic)
-
- # Test spark.glm works with weighted dataset
- a1 <- c(0, 1, 2, 3)
- a2 <- c(5, 2, 1, 3)
- w <- c(1, 2, 3, 4)
- b <- c(1, 0, 1, 0)
- data <- as.data.frame(cbind(a1, a2, w, b))
- df <- suppressWarnings(createDataFrame(data))
-
- stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w"))
- rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w))
-
- coefs <- unlist(stats$coefficients)
- rCoefs <- unlist(rStats$coefficients)
- expect_true(all(abs(rCoefs - coefs) < 1e-3))
- expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2")))
- expect_equal(stats$dispersion, rStats$dispersion)
- expect_equal(stats$null.deviance, rStats$null.deviance)
- expect_equal(stats$deviance, rStats$deviance)
- expect_equal(stats$df.null, rStats$df.null)
- expect_equal(stats$df.residual, rStats$df.residual)
- expect_equal(stats$aic, rStats$aic)
-
- # Test summary works on base GLM models
- baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
- baseSummary <- summary(baseModel)
- expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
-
- # Test spark.glm works with regularization parameter
- data <- as.data.frame(cbind(a1, a2, b))
- df <- suppressWarnings(createDataFrame(data))
- regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
- expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
-
- # Test spark.glm works on collinear data
- A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
- b <- c(1, 2, 3, 4)
- data <- as.data.frame(cbind(A, b))
- df <- createDataFrame(data)
- stats <- summary(spark.glm(df, b ~ . - 1))
- coefs <- unlist(stats$coefficients)
- expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4))
-})
-
-test_that("spark.glm save/load", {
- training <- suppressWarnings(createDataFrame(iris))
- m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
- s <- summary(m)
-
- modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp")
- write.ml(m, modelPath)
- expect_error(write.ml(m, modelPath))
- write.ml(m, modelPath, overwrite = TRUE)
- m2 <- read.ml(modelPath)
- s2 <- summary(m2)
-
- expect_equal(s$coefficients, s2$coefficients)
- expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
- expect_equal(s$dispersion, s2$dispersion)
- expect_equal(s$null.deviance, s2$null.deviance)
- expect_equal(s$deviance, s2$deviance)
- expect_equal(s$df.null, s2$df.null)
- expect_equal(s$df.residual, s2$df.residual)
- expect_equal(s$aic, s2$aic)
- expect_equal(s$iter, s2$iter)
- expect_true(!s$is.loaded)
- expect_true(s2$is.loaded)
-
- unlink(modelPath)
-})
-
-
-
-test_that("formula of glm", {
- training <- suppressWarnings(createDataFrame(iris))
- # dot minus and intercept vs native glm
- model <- glm(Sepal_Width ~ . - Species + 0, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # feature interaction vs native glm
- model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # glm should work with long formula
- training <- suppressWarnings(createDataFrame(iris))
- training$LongLongLongLongLongName <- training$Sepal_Width
- training$VeryLongLongLongLonLongName <- training$Sepal_Length
- training$AnotherLongLongLongLongName <- training$Species
- model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
- data = training)
- vals <- collect(select(predict(model, training), "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("glm and predict", {
- training <- suppressWarnings(createDataFrame(iris))
- # gaussian family
- model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
- prediction <- predict(model, training)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
- vals <- collect(select(prediction, "prediction"))
- rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # poisson family
- model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
- family = poisson(link = identity))
- prediction <- predict(model, training)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
- vals <- collect(select(prediction, "prediction"))
- rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
- data = iris, family = poisson(link = identity)), iris))
- expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
- # Test stats::predict is working
- x <- rnorm(15)
- y <- x + rnorm(15)
- expect_equal(length(predict(lm(y ~ x))), 15)
-})
-
-test_that("glm summary", {
- # gaussian family
- training <- suppressWarnings(createDataFrame(iris))
- stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))
-
- rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))
-
- coefs <- unlist(stats$coefficients)
- rCoefs <- unlist(rStats$coefficients)
- expect_true(all(abs(rCoefs - coefs) < 1e-4))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
- expect_equal(stats$dispersion, rStats$dispersion)
- expect_equal(stats$null.deviance, rStats$null.deviance)
- expect_equal(stats$deviance, rStats$deviance)
- expect_equal(stats$df.null, rStats$df.null)
- expect_equal(stats$df.residual, rStats$df.residual)
- expect_equal(stats$aic, rStats$aic)
-
- # binomial family
- df <- suppressWarnings(createDataFrame(iris))
- training <- df[df$Species %in% c("versicolor", "virginica"), ]
- stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
- family = binomial(link = "logit")))
-
- rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ]
- rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
- family = binomial(link = "logit")))
-
- coefs <- unlist(stats$coefficients)
- rCoefs <- unlist(rStats$coefficients)
- expect_true(all(abs(rCoefs - coefs) < 1e-4))
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "Sepal_Length", "Sepal_Width")))
- expect_equal(stats$dispersion, rStats$dispersion)
- expect_equal(stats$null.deviance, rStats$null.deviance)
- expect_equal(stats$deviance, rStats$deviance)
- expect_equal(stats$df.null, rStats$df.null)
- expect_equal(stats$df.residual, rStats$df.residual)
- expect_equal(stats$aic, rStats$aic)
-
- # Test summary works on base GLM models
- baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
- baseSummary <- summary(baseModel)
- expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
-})
-
-test_that("glm save/load", {
- training <- suppressWarnings(createDataFrame(iris))
- m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
- s <- summary(m)
-
- modelPath <- tempfile(pattern = "glm", fileext = ".tmp")
- write.ml(m, modelPath)
- expect_error(write.ml(m, modelPath))
- write.ml(m, modelPath, overwrite = TRUE)
- m2 <- read.ml(modelPath)
- s2 <- summary(m2)
-
- expect_equal(s$coefficients, s2$coefficients)
- expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
- expect_equal(s$dispersion, s2$dispersion)
- expect_equal(s$null.deviance, s2$null.deviance)
- expect_equal(s$deviance, s2$deviance)
- expect_equal(s$df.null, s2$df.null)
- expect_equal(s$df.residual, s2$df.residual)
- expect_equal(s$aic, s2$aic)
- expect_equal(s$iter, s2$iter)
- expect_true(!s$is.loaded)
- expect_true(s2$is.loaded)
-
- unlink(modelPath)
-})
-
-test_that("spark.kmeans", {
- newIris <- iris
- newIris$Species <- NULL
- training <- suppressWarnings(createDataFrame(newIris))
-
- take(training, 1)
-
- model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random")
- sample <- take(select(predict(model, training), "prediction"), 1)
- expect_equal(typeof(sample$prediction), "integer")
- expect_equal(sample$prediction, 1)
-
- # Test stats::kmeans is working
- statsModel <- kmeans(x = newIris, centers = 2)
- expect_equal(sort(unique(statsModel$cluster)), c(1, 2))
-
- # Test fitted works on KMeans
- fitted.model <- fitted(model)
- expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))
-
- # Test summary works on KMeans
- summary.model <- summary(model)
- cluster <- summary.model$cluster
- k <- summary.model$k
- expect_equal(k, 2)
- expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- summary2 <- summary(model2)
- expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
- expect_equal(summary.model$coefficients, summary2$coefficients)
- expect_true(!summary.model$is.loaded)
- expect_true(summary2$is.loaded)
-
- unlink(modelPath)
-
- # Test Kmeans on dataset that is sensitive to seed value
- col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
- col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
- col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
- cols <- as.data.frame(cbind(col1, col2, col3))
- df <- createDataFrame(cols)
-
- model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
- initMode = "random", seed = 1, tol = 1E-5)
- model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
- initMode = "random", seed = 22222, tol = 1E-5)
-
- summary.model1 <- summary(model1)
- summary.model2 <- summary(model2)
- cluster1 <- summary.model1$cluster
- cluster2 <- summary.model2$cluster
- clusterSize1 <- summary.model1$clusterSize
- clusterSize2 <- summary.model2$clusterSize
-
- # The predicted clusters are different
- expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
- c(0, 1, 2, 3))
- expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
- c(0, 1, 2))
- expect_equal(clusterSize1, 4)
- expect_equal(clusterSize2, 3)
-})
-
-test_that("spark.mlp", {
- df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
- source = "libsvm")
- model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
- solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
-
- # Test summary method
- summary <- summary(model)
- expect_equal(summary$numOfInputs, 4)
- expect_equal(summary$numOfOutputs, 3)
- expect_equal(summary$layers, c(4, 5, 4, 3))
- expect_equal(length(summary$weights), 64)
- expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
- tolerance = 1e-6)
-
- # Test predict method
- mlpTestDF <- df
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- summary2 <- summary(model2)
-
- expect_equal(summary2$numOfInputs, 4)
- expect_equal(summary2$numOfOutputs, 3)
- expect_equal(summary2$layers, c(4, 5, 4, 3))
- expect_equal(length(summary2$weights), 64)
-
- unlink(modelPath)
-
- # Test default parameter
- model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
- # Test illegal parameter
- expect_error(spark.mlp(df, label ~ features, layers = NULL),
- "layers must be a integer vector with length > 1.")
- expect_error(spark.mlp(df, label ~ features, layers = c()),
- "layers must be a integer vector with length > 1.")
- expect_error(spark.mlp(df, label ~ features, layers = c(3)),
- "layers must be a integer vector with length > 1.")
-
- # Test random seed
- # default seed
- model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
- # seed equals 10
- model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
- # test initialWeights
- model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
- c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
- model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
- c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
-
- model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
- mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
- expect_equal(head(mlpPredictions$prediction, 10),
- c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0"))
-
- # Test formula works well
- df <- suppressWarnings(createDataFrame(iris))
- model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
- layers = c(4, 3))
- summary <- summary(model)
- expect_equal(summary$numOfInputs, 4)
- expect_equal(summary$numOfOutputs, 3)
- expect_equal(summary$layers, c(4, 3))
- expect_equal(length(summary$weights), 15)
- expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413,
- -10.2376130), tolerance = 1e-6)
-})
-
-test_that("spark.naiveBayes", {
- # R code to reproduce the result.
- # We do not support instance weights yet. So we ignore the frequencies.
- #
- #' library(e1071)
- #' t <- as.data.frame(Titanic)
- #' t1 <- t[t$Freq > 0, -5]
- #' m <- naiveBayes(Survived ~ ., data = t1)
- #' m
- #' predict(m, t1)
- #
- # -- output of 'm'
- #
- # A-priori probabilities:
- # Y
- # No Yes
- # 0.4166667 0.5833333
- #
- # Conditional probabilities:
- # Class
- # Y 1st 2nd 3rd Crew
- # No 0.2000000 0.2000000 0.4000000 0.2000000
- # Yes 0.2857143 0.2857143 0.2857143 0.1428571
- #
- # Sex
- # Y Male Female
- # No 0.5 0.5
- # Yes 0.5 0.5
- #
- # Age
- # Y Child Adult
- # No 0.2000000 0.8000000
- # Yes 0.4285714 0.5714286
- #
- # -- output of 'predict(m, t1)'
- #
- # Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No
- #
-
- t <- as.data.frame(Titanic)
- t1 <- t[t$Freq > 0, -5]
- df <- suppressWarnings(createDataFrame(t1))
- m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0)
- s <- summary(m)
- expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6)
- expect_equal(sum(s$apriori), 1)
- expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6)
- p <- collect(select(predict(m, df), "prediction"))
- expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No",
- "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No",
- "Yes", "Yes", "No", "No"))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
- write.ml(m, modelPath)
- expect_error(write.ml(m, modelPath))
- write.ml(m, modelPath, overwrite = TRUE)
- m2 <- read.ml(modelPath)
- s2 <- summary(m2)
- expect_equal(s$apriori, s2$apriori)
- expect_equal(s$tables, s2$tables)
-
- unlink(modelPath)
-
- # Test e1071::naiveBayes
- if (requireNamespace("e1071", quietly = TRUE)) {
- expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
- expect_equal(as.character(predict(m, t1[1, ])), "Yes")
- }
-
- # Test numeric response variable
- t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
- t2 <- t1[-4]
- df <- suppressWarnings(createDataFrame(t2))
- m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
- s <- summary(m)
- expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
- expect_equal(sum(s$apriori), 1)
- expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
-})
-
-test_that("spark.survreg", {
- # R code to reproduce the result.
- #
- #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
- #' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
- #' library(survival)
- #' model <- survreg(Surv(time, status) ~ x + sex, rData)
- #' summary(model)
- #' predict(model, data)
- #
- # -- output of 'summary(model)'
- #
- # Value Std. Error z p
- # (Intercept) 1.315 0.270 4.88 1.07e-06
- # x -0.190 0.173 -1.10 2.72e-01
- # sex -0.253 0.329 -0.77 4.42e-01
- # Log(scale) -1.160 0.396 -2.93 3.41e-03
- #
- # -- output of 'predict(model, data)'
- #
- # 1 2 3 4 5 6 7
- # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
- #
- data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
- list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
- df <- createDataFrame(data, c("time", "status", "x", "sex"))
- model <- spark.survreg(df, Surv(time, status) ~ x + sex)
- stats <- summary(model)
- coefs <- as.vector(stats$coefficients[, 1])
- rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
- expect_equal(coefs, rCoefs, tolerance = 1e-4)
- expect_true(all(
- rownames(stats$coefficients) ==
- c("(Intercept)", "x", "sex", "Log(scale)")))
- p <- collect(select(predict(model, df), "prediction"))
- expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
- 2.390146, 2.891269, 2.891269), tolerance = 1e-4)
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- coefs2 <- as.vector(stats2$coefficients[, 1])
- expect_equal(coefs, coefs2)
- expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))
-
- unlink(modelPath)
-
- # Test survival::survreg
- if (requireNamespace("survival", quietly = TRUE)) {
- rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
- x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
- expect_error(
- model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
- NA)
- expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)
- }
-})
-
-test_that("spark.isotonicRegression", {
- label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
- feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
- weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
- data <- as.data.frame(cbind(label, feature, weight))
- df <- suppressWarnings(createDataFrame(data))
-
- model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
- weightCol = "weight")
- # only allow one variable on the right hand side of the formula
- expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
- result <- summary(model)
- expect_equal(result$predictions, list(7, 5, 4, 4, 1))
-
- # Test model prediction
- predict_data <- list(list(-2.0), list(-1.0), list(0.5),
- list(0.75), list(1.0), list(2.0), list(9.0))
- predict_df <- createDataFrame(predict_data, c("feature"))
- predict_result <- collect(select(predict(model, predict_df), "prediction"))
- expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- expect_equal(result, summary(model2))
-
- unlink(modelPath)
-})
-
-test_that("spark.logit", {
- # R code to reproduce the result.
- # nolint start
- #' library(glmnet)
- #' iris.x = as.matrix(iris[, 1:4])
- #' iris.y = as.factor(as.character(iris[, 5]))
- #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
- #' coef(logit)
- #
- # $setosa
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # 1.0981324
- # Sepal.Length -0.2909860
- # Sepal.Width 0.5510907
- # Petal.Length -0.1915217
- # Petal.Width -0.4211946
- #
- # $versicolor
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # 1.520061e+00
- # Sepal.Length 2.524501e-02
- # Sepal.Width -5.310313e-01
- # Petal.Length 3.656543e-02
- # Petal.Width -3.144464e-05
- #
- # $virginica
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # -2.61819385
- # Sepal.Length 0.26574097
- # Sepal.Width -0.02005932
- # Petal.Length 0.15495629
- # Petal.Width 0.42122607
- # nolint end
-
- # Test multinomial logistic regression againt three classes
- df <- suppressWarnings(createDataFrame(iris))
- model <- spark.logit(df, Species ~ ., regParam = 0.5)
- summary <- summary(model)
- versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
- virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42)
- setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42)
- versicolorCoefs <- unlist(summary$coefficients[, "versicolor"])
- virginicaCoefs <- unlist(summary$coefficients[, "virginica"])
- setosaCoefs <- unlist(summary$coefficients[, "setosa"])
- expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
- expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
- expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))
-
- # Test model save and load
- modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- coefs <- summary(model)$coefficients
- coefs2 <- summary(model2)$coefficients
- expect_equal(coefs, coefs2)
- unlink(modelPath)
-
- # R code to reproduce the result.
- # nolint start
- #' library(glmnet)
- #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ]
- #' iris.x = as.matrix(iris2[, 1:4])
- #' iris.y = as.factor(as.character(iris2[, 5]))
- #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
- #' coef(logit)
- #
- # $versicolor
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # 3.93844796
- # Sepal.Length -0.13538675
- # Sepal.Width -0.02386443
- # Petal.Length -0.35076451
- # Petal.Width -0.77971954
- #
- # $virginica
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # -3.93844796
- # Sepal.Length 0.13538675
- # Sepal.Width 0.02386443
- # Petal.Length 0.35076451
- # Petal.Width 0.77971954
- #
- #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5)
- #' coef(logit)
- #
- # 5 x 1 sparse Matrix of class "dgCMatrix"
- # s0
- # (Intercept) -6.0824412
- # Sepal.Length 0.2458260
- # Sepal.Width 0.1642093
- # Petal.Length 0.4759487
- # Petal.Width 1.0383948
- #
- # nolint end
-
- # Test multinomial logistic regression againt two classes
- df <- suppressWarnings(createDataFrame(iris))
- training <- df[df$Species %in% c("versicolor", "virginica"), ]
- model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
- summary <- summary(model)
- versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78)
- virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78)
- versicolorCoefs <- unlist(summary$coefficients[, "versicolor"])
- virginicaCoefs <- unlist(summary$coefficients[, "virginica"])
- expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
- expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
-
- # Test binomial logistic regression againt two classes
- model <- spark.logit(training, Species ~ ., regParam = 0.5)
- summary <- summary(model)
- coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
- coefs <- unlist(summary$coefficients[, "Estimate"])
- expect_true(all(abs(coefsR - coefs) < 0.1))
-
- # Test prediction with string label
- prediction <- predict(model, training)
- expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
- expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor",
- "versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
- expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
-
- # Test prediction with numeric label
- label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
- feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
- data <- as.data.frame(cbind(label, feature))
- df <- createDataFrame(data)
- model <- spark.logit(df, label ~ feature)
- prediction <- collect(select(predict(model, df), "prediction"))
- expect_equal(prediction$prediction, c("0.0", "0.0", "1.0", "1.0", "0.0"))
-})
-
-test_that("spark.gaussianMixture", {
- # R code to reproduce the result.
- # nolint start
- #' library(mvtnorm)
- #' set.seed(1)
- #' a <- rmvnorm(7, c(0, 0))
- #' b <- rmvnorm(8, c(10, 10))
- #' data <- rbind(a, b)
- #' model <- mvnormalmixEM(data, k = 2)
- #' model$lambda
- #
- # [1] 0.4666667 0.5333333
- #
- #' model$mu
- #
- # [1] 0.11731091 -0.06192351
- # [1] 10.363673 9.897081
- #
- #' model$sigma
- #
- # [[1]]
- # [,1] [,2]
- # [1,] 0.62049934 0.06880802
- # [2,] 0.06880802 1.27431874
- #
- # [[2]]
- # [,1] [,2]
- # [1,] 0.2961543 0.160783
- # [2,] 0.1607830 1.008878
- # nolint end
- data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808),
- list(0.3295078, -0.8204684), list(0.4874291, 0.7383247),
- list(0.5757814, -0.3053884), list(1.5117812, 0.3898432),
- list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664),
- list(9.9838097, 10.9438362), list(10.8212212, 10.5939013),
- list(10.9189774, 10.7821363), list(10.0745650, 8.0106483),
- list(10.6198257, 9.9438713), list(9.8442045, 8.5292476),
- list(9.5218499, 10.4179416))
- df <- createDataFrame(data, c("x1", "x2"))
- model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2)
- stats <- summary(model)
- rLambda <- c(0.4666667, 0.5333333)
- rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081)
- rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874,
- 0.2961543, 0.160783, 0.1607830, 1.008878)
- expect_equal(stats$lambda, rLambda, tolerance = 1e-3)
- expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3)
- expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3)
- p <- collect(select(predict(model, df), "prediction"))
- expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats$lambda, stats2$lambda)
- expect_equal(unlist(stats$mu), unlist(stats2$mu))
- expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
-
- unlink(modelPath)
-})
-
-test_that("spark.lda with libsvm", {
- text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm")
- model <- spark.lda(text, optimizer = "em")
-
- stats <- summary(model, 10)
- isDistributed <- stats$isDistributed
- logLikelihood <- stats$logLikelihood
- logPerplexity <- stats$logPerplexity
- vocabSize <- stats$vocabSize
- topics <- stats$topicTopTerms
- weights <- stats$topicTopTermsWeights
- vocabulary <- stats$vocabulary
-
- expect_true(isDistributed)
- expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
- expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
- expect_equal(vocabSize, 11)
- expect_true(is.null(vocabulary))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
-
- expect_true(stats2$isDistributed)
- expect_equal(logLikelihood, stats2$logLikelihood)
- expect_equal(logPerplexity, stats2$logPerplexity)
- expect_equal(vocabSize, stats2$vocabSize)
- expect_equal(vocabulary, stats2$vocabulary)
-
- unlink(modelPath)
-})
-
-test_that("spark.lda with text input", {
- text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
- model <- spark.lda(text, optimizer = "online", features = "value")
-
- stats <- summary(model)
- isDistributed <- stats$isDistributed
- logLikelihood <- stats$logLikelihood
- logPerplexity <- stats$logPerplexity
- vocabSize <- stats$vocabSize
- topics <- stats$topicTopTerms
- weights <- stats$topicTopTermsWeights
- vocabulary <- stats$vocabulary
-
- expect_false(isDistributed)
- expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
- expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
- expect_equal(vocabSize, 10)
- expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
-
- expect_false(stats2$isDistributed)
- expect_equal(logLikelihood, stats2$logLikelihood)
- expect_equal(logPerplexity, stats2$logPerplexity)
- expect_equal(vocabSize, stats2$vocabSize)
- expect_true(all.equal(vocabulary, stats2$vocabulary))
-
- unlink(modelPath)
-})
-
-test_that("spark.posterior and spark.perplexity", {
- text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
- model <- spark.lda(text, features = "value", k = 3)
-
- # Assert perplexities are equal
- stats <- summary(model)
- logPerplexity <- spark.perplexity(model, text)
- expect_equal(logPerplexity, stats$logPerplexity)
-
- # Assert the sum of every topic distribution is equal to 1
- posterior <- spark.posterior(model, text)
- local.posterior <- collect(posterior)$topicDistribution
- expect_equal(length(local.posterior), sum(unlist(local.posterior)))
-})
-
-test_that("spark.als", {
- data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
- list(2, 1, 1.0), list(2, 2, 5.0))
- df <- createDataFrame(data, c("user", "item", "score"))
- model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item",
- rank = 10, maxIter = 5, seed = 0, regParam = 0.1)
- stats <- summary(model)
- expect_equal(stats$rank, 10)
- test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item"))
- predictions <- collect(predict(model, test))
-
- expect_equal(predictions$prediction, c(-0.1380762, 2.6258414, -1.5018409),
- tolerance = 1e-4)
-
- # Test model save/load
- modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats2$rating, "score")
- userFactors <- collect(stats$userFactors)
- itemFactors <- collect(stats$itemFactors)
- userFactors2 <- collect(stats2$userFactors)
- itemFactors2 <- collect(stats2$itemFactors)
-
- orderUser <- order(userFactors$id)
- orderUser2 <- order(userFactors2$id)
- expect_equal(userFactors$id[orderUser], userFactors2$id[orderUser2])
- expect_equal(userFactors$features[orderUser], userFactors2$features[orderUser2])
-
- orderItem <- order(itemFactors$id)
- orderItem2 <- order(itemFactors2$id)
- expect_equal(itemFactors$id[orderItem], itemFactors2$id[orderItem2])
- expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2])
-
- unlink(modelPath)
-})
-
-test_that("spark.kstest", {
- data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5))
- df <- createDataFrame(data)
- testResult <- spark.kstest(df, "test", "norm")
- stats <- summary(testResult)
-
- rStats <- ks.test(data$test, "pnorm", alternative = "two.sided")
-
- expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
- expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
- expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
-
- testResult <- spark.kstest(df, "test", "norm", -0.5)
- stats <- summary(testResult)
-
- rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided")
-
- expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4)
- expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4)
- expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:")
-
- # Test print.summary.KSTest
- printStats <- capture.output(print.summary.KSTest(stats))
- expect_match(printStats[1], "Kolmogorov-Smirnov test summary:")
- expect_match(printStats[5],
- "Low presumption against null hypothesis: Sample follows theoretical distribution. ")
-})
-
-test_that("spark.randomForest", {
- # regression
- data <- suppressWarnings(createDataFrame(longley))
- model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
- numTrees = 1)
-
- predictions <- collect(predict(model, data))
- expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
- 63.221, 63.639, 64.989, 63.761,
- 66.019, 67.857, 68.169, 66.513,
- 68.655, 69.564, 69.331, 70.551),
- tolerance = 1e-4)
-
- stats <- summary(model)
- expect_equal(stats$numTrees, 1)
- expect_error(capture.output(stats), NA)
- expect_true(length(capture.output(stats)) > 6)
-
- model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16,
- numTrees = 20, seed = 123)
- predictions <- collect(predict(model, data))
- expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070,
- 63.53160, 64.05470, 65.12710, 64.30450,
- 66.70910, 67.86125, 68.08700, 67.21865,
- 68.89275, 69.53180, 69.39640, 69.68250),
-
- tolerance = 1e-4)
- stats <- summary(model)
- expect_equal(stats$numTrees, 20)
-
- modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats$formula, stats2$formula)
- expect_equal(stats$numFeatures, stats2$numFeatures)
- expect_equal(stats$features, stats2$features)
- expect_equal(stats$featureImportances, stats2$featureImportances)
- expect_equal(stats$numTrees, stats2$numTrees)
- expect_equal(stats$treeWeights, stats2$treeWeights)
-
- unlink(modelPath)
-
- # classification
- data <- suppressWarnings(createDataFrame(iris))
- model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification",
- maxDepth = 5, maxBins = 16)
-
- stats <- summary(model)
- expect_equal(stats$numFeatures, 2)
- expect_equal(stats$numTrees, 20)
- expect_error(capture.output(stats), NA)
- expect_true(length(capture.output(stats)) > 6)
- # Test string prediction values
- predictions <- collect(predict(model, data))$prediction
- expect_equal(length(grep("setosa", predictions)), 50)
- expect_equal(length(grep("versicolor", predictions)), 50)
-
- modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats$depth, stats2$depth)
- expect_equal(stats$numNodes, stats2$numNodes)
- expect_equal(stats$numClasses, stats2$numClasses)
-
- unlink(modelPath)
-
- # Test numeric response variable
- labelToIndex <- function(species) {
- switch(as.character(species),
- setosa = 0.0,
- versicolor = 1.0,
- virginica = 2.0
- )
- }
- iris$NumericSpecies <- lapply(iris$Species, labelToIndex)
- data <- suppressWarnings(createDataFrame(iris[-5]))
- model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification",
- maxDepth = 5, maxBins = 16)
- stats <- summary(model)
- expect_equal(stats$numFeatures, 2)
- expect_equal(stats$numTrees, 20)
- # Test numeric prediction values
- predictions <- collect(predict(model, data))$prediction
- expect_equal(length(grep("1.0", predictions)), 50)
- expect_equal(length(grep("2.0", predictions)), 50)
-
- # spark.randomForest classification can work on libsvm data
- data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
- source = "libsvm")
- model <- spark.randomForest(data, label ~ features, "classification")
- expect_equal(summary(model)$numFeatures, 4)
-})
-
-test_that("spark.gbt", {
- # regression
- data <- suppressWarnings(createDataFrame(longley))
- model <- spark.gbt(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, seed = 123)
- predictions <- collect(predict(model, data))
- expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187,
- 63.221, 63.639, 64.989, 63.761,
- 66.019, 67.857, 68.169, 66.513,
- 68.655, 69.564, 69.331, 70.551),
- tolerance = 1e-4)
- stats <- summary(model)
- expect_equal(stats$numTrees, 20)
- expect_equal(stats$formula, "Employed ~ .")
- expect_equal(stats$numFeatures, 6)
- expect_equal(length(stats$treeWeights), 20)
-
- modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats$formula, stats2$formula)
- expect_equal(stats$numFeatures, stats2$numFeatures)
- expect_equal(stats$features, stats2$features)
- expect_equal(stats$featureImportances, stats2$featureImportances)
- expect_equal(stats$numTrees, stats2$numTrees)
- expect_equal(stats$treeWeights, stats2$treeWeights)
-
- unlink(modelPath)
-
- # classification
- # label must be binary - GBTClassifier currently only supports binary classification.
- iris2 <- iris[iris$Species != "virginica", ]
- data <- suppressWarnings(createDataFrame(iris2))
- model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification")
- stats <- summary(model)
- expect_equal(stats$numFeatures, 2)
- expect_equal(stats$numTrees, 20)
- expect_error(capture.output(stats), NA)
- expect_true(length(capture.output(stats)) > 6)
- predictions <- collect(predict(model, data))$prediction
- # test string prediction values
- expect_equal(length(grep("setosa", predictions)), 50)
- expect_equal(length(grep("versicolor", predictions)), 50)
-
- modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp")
- write.ml(model, modelPath)
- expect_error(write.ml(model, modelPath))
- write.ml(model, modelPath, overwrite = TRUE)
- model2 <- read.ml(modelPath)
- stats2 <- summary(model2)
- expect_equal(stats$depth, stats2$depth)
- expect_equal(stats$numNodes, stats2$numNodes)
- expect_equal(stats$numClasses, stats2$numClasses)
-
- unlink(modelPath)
-
- iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1)
- df <- suppressWarnings(createDataFrame(iris2))
- m <- spark.gbt(df, NumericSpecies ~ ., type = "classification")
- s <- summary(m)
- # test numeric prediction values
- expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
- expect_equal(s$numFeatures, 5)
- expect_equal(s$numTrees, 20)
-
- # spark.gbt classification can work on libsvm data
- data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
- source = "libsvm")
- model <- spark.gbt(data, label ~ features, "classification")
- expect_equal(summary(model)$numFeatures, 692)
-})
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_parallelize_collect.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_parallelize_collect.R b/R/pkg/inst/tests/testthat/test_parallelize_collect.R
deleted file mode 100644
index 55972e1..0000000
--- a/R/pkg/inst/tests/testthat/test_parallelize_collect.R
+++ /dev/null
@@ -1,112 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("parallelize() and collect()")
-
-# Mock data
-numVector <- c(-10:97)
-numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
-strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
- "violated, but I'm not. No, in fact, I think this is a friendly",
- "message, like \"Hey, wanna play?\" and yes, I want to play. ",
- "I really, really do.")
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
- "other times it helps me control the chaos.",
- "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
- "raising me. But they're both dead now. I didn't kill them. Honest.")
-
-numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3))
-strPairs <- list(list(strList, strList), list(strList, strList))
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(enableHiveSupport = FALSE)
-jsc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-# Tests
-
-test_that("parallelize() on simple vectors and lists returns an RDD", {
- numVectorRDD <- parallelize(jsc, numVector, 1)
- numVectorRDD2 <- parallelize(jsc, numVector, 10)
- numListRDD <- parallelize(jsc, numList, 1)
- numListRDD2 <- parallelize(jsc, numList, 4)
- strVectorRDD <- parallelize(jsc, strVector, 2)
- strVectorRDD2 <- parallelize(jsc, strVector, 3)
- strListRDD <- parallelize(jsc, strList, 4)
- strListRDD2 <- parallelize(jsc, strList, 1)
-
- rdds <- c(numVectorRDD,
- numVectorRDD2,
- numListRDD,
- numListRDD2,
- strVectorRDD,
- strVectorRDD2,
- strListRDD,
- strListRDD2)
-
- for (rdd in rdds) {
- expect_is(rdd, "RDD")
- expect_true(.hasSlot(rdd, "jrdd")
- && inherits(rdd@jrdd, "jobj")
- && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD"))
- }
-})
-
-test_that("collect(), following a parallelize(), gives back the original collections", {
- numVectorRDD <- parallelize(jsc, numVector, 10)
- expect_equal(collectRDD(numVectorRDD), as.list(numVector))
-
- numListRDD <- parallelize(jsc, numList, 1)
- numListRDD2 <- parallelize(jsc, numList, 4)
- expect_equal(collectRDD(numListRDD), as.list(numList))
- expect_equal(collectRDD(numListRDD2), as.list(numList))
-
- strVectorRDD <- parallelize(jsc, strVector, 2)
- strVectorRDD2 <- parallelize(jsc, strVector, 3)
- expect_equal(collectRDD(strVectorRDD), as.list(strVector))
- expect_equal(collectRDD(strVectorRDD2), as.list(strVector))
-
- strListRDD <- parallelize(jsc, strList, 4)
- strListRDD2 <- parallelize(jsc, strList, 1)
- expect_equal(collectRDD(strListRDD), as.list(strList))
- expect_equal(collectRDD(strListRDD2), as.list(strList))
-})
-
-test_that("regression: collect() following a parallelize() does not drop elements", {
- # 10 %/% 6 = 1, ceiling(10 / 6) = 2
- collLen <- 10
- numPart <- 6
- expected <- runif(collLen)
- actual <- collectRDD(parallelize(jsc, expected, numPart))
- expect_equal(actual, as.list(expected))
-})
-
-test_that("parallelize() and collect() work for lists of pairs (pairwise data)", {
- # use the pairwise logical to indicate pairwise data
- numPairsRDDD1 <- parallelize(jsc, numPairs, 1)
- numPairsRDDD2 <- parallelize(jsc, numPairs, 2)
- numPairsRDDD3 <- parallelize(jsc, numPairs, 3)
- expect_equal(collectRDD(numPairsRDDD1), numPairs)
- expect_equal(collectRDD(numPairsRDDD2), numPairs)
- expect_equal(collectRDD(numPairsRDDD3), numPairs)
- # can also leave out the parameter name, if the params are supplied in order
- strPairsRDDD1 <- parallelize(jsc, strPairs, 1)
- strPairsRDDD2 <- parallelize(jsc, strPairs, 2)
- expect_equal(collectRDD(strPairsRDDD1), strPairs)
- expect_equal(collectRDD(strPairsRDDD2), strPairs)
-})
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R
deleted file mode 100644
index 787ef51..0000000
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ /dev/null
@@ -1,804 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("basic RDD functions")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-# Data
-nums <- 1:10
-rdd <- parallelize(sc, nums, 2L)
-
-intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
-intRdd <- parallelize(sc, intPairs, 2L)
-
-test_that("get number of partitions in RDD", {
- expect_equal(getNumPartitionsRDD(rdd), 2)
- expect_equal(getNumPartitionsRDD(intRdd), 2)
-})
-
-test_that("first on RDD", {
- expect_equal(firstRDD(rdd), 1)
- newrdd <- lapply(rdd, function(x) x + 1)
- expect_equal(firstRDD(newrdd), 2)
-})
-
-test_that("count and length on RDD", {
- expect_equal(countRDD(rdd), 10)
- expect_equal(lengthRDD(rdd), 10)
-})
-
-test_that("count by values and keys", {
- mods <- lapply(rdd, function(x) { x %% 3 })
- actual <- countByValue(mods)
- expected <- list(list(0, 3L), list(1, 4L), list(2, 3L))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- actual <- countByKey(intRdd)
- expected <- list(list(2L, 2L), list(1L, 2L))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("lapply on RDD", {
- multiples <- lapply(rdd, function(x) { 2 * x })
- actual <- collectRDD(multiples)
- expect_equal(actual, as.list(nums * 2))
-})
-
-test_that("lapplyPartition on RDD", {
- sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) })
- actual <- collectRDD(sums)
- expect_equal(actual, list(15, 40))
-})
-
-test_that("mapPartitions on RDD", {
- sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) })
- actual <- collectRDD(sums)
- expect_equal(actual, list(15, 40))
-})
-
-test_that("flatMap() on RDDs", {
- flat <- flatMap(intRdd, function(x) { list(x, x) })
- actual <- collectRDD(flat)
- expect_equal(actual, rep(intPairs, each = 2))
-})
-
-test_that("filterRDD on RDD", {
- filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 })
- actual <- collectRDD(filtered.rdd)
- expect_equal(actual, list(2, 4, 6, 8, 10))
-
- filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd)
- actual <- collectRDD(filtered.rdd)
- expect_equal(actual, list(list(1L, -1)))
-
- # Filter out all elements.
- filtered.rdd <- filterRDD(rdd, function(x) { x > 10 })
- actual <- collectRDD(filtered.rdd)
- expect_equal(actual, list())
-})
-
-test_that("lookup on RDD", {
- vals <- lookup(intRdd, 1L)
- expect_equal(vals, list(-1, 200))
-
- vals <- lookup(intRdd, 3L)
- expect_equal(vals, list())
-})
-
-test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
- rdd2 <- rdd
- for (i in 1:12)
- rdd2 <- lapplyPartitionsWithIndex(
- rdd2, function(partIndex, part) {
- part <- as.list(unlist(part) * partIndex + i)
- })
- rdd2 <- lapply(rdd2, function(x) x + x)
- actual <- collectRDD(rdd2)
- expected <- list(24, 24, 24, 24, 24,
- 168, 170, 172, 174, 176)
- expect_equal(actual, expected)
-})
-
-test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", {
- # RDD
- rdd2 <- rdd
- # PipelinedRDD
- rdd2 <- lapplyPartitionsWithIndex(
- rdd2,
- function(partIndex, part) {
- part <- as.list(unlist(part) * partIndex)
- })
-
- cacheRDD(rdd2)
- expect_true(rdd2@env$isCached)
- rdd2 <- lapply(rdd2, function(x) x)
- expect_false(rdd2@env$isCached)
-
- unpersistRDD(rdd2)
- expect_false(rdd2@env$isCached)
-
- persistRDD(rdd2, "MEMORY_AND_DISK")
- expect_true(rdd2@env$isCached)
- rdd2 <- lapply(rdd2, function(x) x)
- expect_false(rdd2@env$isCached)
-
- unpersistRDD(rdd2)
- expect_false(rdd2@env$isCached)
-
- tempDir <- tempfile(pattern = "checkpoint")
- setCheckpointDir(sc, tempDir)
- checkpoint(rdd2)
- expect_true(rdd2@env$isCheckpointed)
-
- rdd2 <- lapply(rdd2, function(x) x)
- expect_false(rdd2@env$isCached)
- expect_false(rdd2@env$isCheckpointed)
-
- # make sure the data is collectable
- collectRDD(rdd2)
-
- unlink(tempDir)
-})
-
-test_that("reduce on RDD", {
- sum <- reduce(rdd, "+")
- expect_equal(sum, 55)
-
- # Also test with an inline function
- sumInline <- reduce(rdd, function(x, y) { x + y })
- expect_equal(sumInline, 55)
-})
-
-test_that("lapply with dependency", {
- fa <- 5
- multiples <- lapply(rdd, function(x) { fa * x })
- actual <- collectRDD(multiples)
-
- expect_equal(actual, as.list(nums * 5))
-})
-
-test_that("lapplyPartitionsWithIndex on RDDs", {
- func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) }
- actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE)
- expect_equal(actual, list(list(0, 15), list(1, 40)))
-
- pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L)
- partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 }
- mkTup <- function(partIndex, part) { list(partIndex, part) }
- actual <- collectRDD(lapplyPartitionsWithIndex(
- partitionByRDD(pairsRDD, 2L, partitionByParity),
- mkTup),
- FALSE)
- expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))),
- list(1, list(list(4, 8)))))
-})
-
-test_that("sampleRDD() on RDDs", {
- expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums)
-})
-
-test_that("takeSample() on RDDs", {
- # ported from RDDSuite.scala, modified seeds
- data <- parallelize(sc, 1:100, 2L)
- for (seed in 4:5) {
- s <- takeSample(data, FALSE, 20L, seed)
- expect_equal(length(s), 20L)
- expect_equal(length(unique(s)), 20L)
- for (elem in s) {
- expect_true(elem >= 1 && elem <= 100)
- }
- }
- for (seed in 4:5) {
- s <- takeSample(data, FALSE, 200L, seed)
- expect_equal(length(s), 100L)
- expect_equal(length(unique(s)), 100L)
- for (elem in s) {
- expect_true(elem >= 1 && elem <= 100)
- }
- }
- for (seed in 4:5) {
- s <- takeSample(data, TRUE, 20L, seed)
- expect_equal(length(s), 20L)
- for (elem in s) {
- expect_true(elem >= 1 && elem <= 100)
- }
- }
- for (seed in 4:5) {
- s <- takeSample(data, TRUE, 100L, seed)
- expect_equal(length(s), 100L)
- # Chance of getting all distinct elements is astronomically low, so test we
- # got less than 100
- expect_true(length(unique(s)) < 100L)
- }
- for (seed in 4:5) {
- s <- takeSample(data, TRUE, 200L, seed)
- expect_equal(length(s), 200L)
- # Chance of getting all distinct elements is still quite low, so test we
- # got less than 100
- expect_true(length(unique(s)) < 100L)
- }
-})
-
-test_that("mapValues() on pairwise RDDs", {
- multiples <- mapValues(intRdd, function(x) { x * 2 })
- actual <- collectRDD(multiples)
- expected <- lapply(intPairs, function(x) {
- list(x[[1]], x[[2]] * 2)
- })
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-})
-
-test_that("flatMapValues() on pairwise RDDs", {
- l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4))))
- actual <- collectRDD(flatMapValues(l, function(x) { x }))
- expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
-
- # Generate x to x+1 for every value
- actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) }))
- expect_equal(actual,
- list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101),
- list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201)))
-})
-
-test_that("reduceByKeyLocally() on PairwiseRDDs", {
- pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L)
- actual <- reduceByKeyLocally(pairs, "+")
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list(1, 6), list(1.1, 3))))
-
- pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3),
- list("bb", 5)), 4L)
- actual <- reduceByKeyLocally(pairs, "+")
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5))))
-})
-
-test_that("distinct() on RDDs", {
- nums.rep2 <- rep(1:10, 2)
- rdd.rep2 <- parallelize(sc, nums.rep2, 2L)
- uniques <- distinctRDD(rdd.rep2)
- actual <- sort(unlist(collectRDD(uniques)))
- expect_equal(actual, nums)
-})
-
-test_that("maximum() on RDDs", {
- max <- maximum(rdd)
- expect_equal(max, 10)
-})
-
-test_that("minimum() on RDDs", {
- min <- minimum(rdd)
- expect_equal(min, 1)
-})
-
-test_that("sumRDD() on RDDs", {
- sum <- sumRDD(rdd)
- expect_equal(sum, 55)
-})
-
-test_that("keyBy on RDDs", {
- func <- function(x) { x * x }
- keys <- keyBy(rdd, func)
- actual <- collectRDD(keys)
- expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
-})
-
-test_that("repartition/coalesce on RDDs", {
- rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements
-
- # repartition
- r1 <- repartitionRDD(rdd, 2)
- expect_equal(getNumPartitionsRDD(r1), 2L)
- count <- length(collectPartition(r1, 0L))
- expect_true(count >= 8 && count <= 12)
-
- r2 <- repartitionRDD(rdd, 6)
- expect_equal(getNumPartitionsRDD(r2), 6L)
- count <- length(collectPartition(r2, 0L))
- expect_true(count >= 0 && count <= 4)
-
- # coalesce
- r3 <- coalesceRDD(rdd, 1)
- expect_equal(getNumPartitionsRDD(r3), 1L)
- count <- length(collectPartition(r3, 0L))
- expect_equal(count, 20)
-})
-
-test_that("sortBy() on RDDs", {
- sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE)
- actual <- collectRDD(sortedRdd)
- expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
-
- rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
- sortedRdd2 <- sortBy(rdd2, function(x) { x * x })
- actual <- collectRDD(sortedRdd2)
- expect_equal(actual, as.list(nums))
-})
-
-test_that("takeOrdered() on RDDs", {
- l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
- rdd <- parallelize(sc, l)
- actual <- takeOrdered(rdd, 6L)
- expect_equal(actual, as.list(sort(unlist(l)))[1:6])
-
- l <- list("e", "d", "c", "d", "a")
- rdd <- parallelize(sc, l)
- actual <- takeOrdered(rdd, 3L)
- expect_equal(actual, as.list(sort(unlist(l)))[1:3])
-})
-
-test_that("top() on RDDs", {
- l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
- rdd <- parallelize(sc, l)
- actual <- top(rdd, 6L)
- expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
-
- l <- list("e", "d", "c", "d", "a")
- rdd <- parallelize(sc, l)
- actual <- top(rdd, 3L)
- expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
-})
-
-test_that("fold() on RDDs", {
- actual <- fold(rdd, 0, "+")
- expect_equal(actual, Reduce("+", nums, 0))
-
- rdd <- parallelize(sc, list())
- actual <- fold(rdd, 0, "+")
- expect_equal(actual, 0)
-})
-
-test_that("aggregateRDD() on RDDs", {
- rdd <- parallelize(sc, list(1, 2, 3, 4))
- zeroValue <- list(0, 0)
- seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
- combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
- actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
- expect_equal(actual, list(10, 4))
-
- rdd <- parallelize(sc, list())
- actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
- expect_equal(actual, list(0, 0))
-})
-
-test_that("zipWithUniqueId() on RDDs", {
- rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collectRDD(zipWithUniqueId(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 4),
- list("d", 2), list("e", 5))
- expect_equal(actual, expected)
-
- rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collectRDD(zipWithUniqueId(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
- list("d", 3), list("e", 4))
- expect_equal(actual, expected)
-})
-
-test_that("zipWithIndex() on RDDs", {
- rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
- actual <- collectRDD(zipWithIndex(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
- list("d", 3), list("e", 4))
- expect_equal(actual, expected)
-
- rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)
- actual <- collectRDD(zipWithIndex(rdd))
- expected <- list(list("a", 0), list("b", 1), list("c", 2),
- list("d", 3), list("e", 4))
- expect_equal(actual, expected)
-})
-
-test_that("glom() on RDD", {
- rdd <- parallelize(sc, as.list(1:4), 2L)
- actual <- collectRDD(glom(rdd))
- expect_equal(actual, list(list(1, 2), list(3, 4)))
-})
-
-test_that("keys() on RDDs", {
- keys <- keys(intRdd)
- actual <- collectRDD(keys)
- expect_equal(actual, lapply(intPairs, function(x) { x[[1]] }))
-})
-
-test_that("values() on RDDs", {
- values <- values(intRdd)
- actual <- collectRDD(values)
- expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
-})
-
-test_that("pipeRDD() on RDDs", {
- actual <- collectRDD(pipeRDD(rdd, "more"))
- expected <- as.list(as.character(1:10))
- expect_equal(actual, expected)
-
- trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
- actual <- collectRDD(pipeRDD(trailed.rdd, "sort"))
- expected <- list("", "1", "2", "3")
- expect_equal(actual, expected)
-
- rev.nums <- 9:0
- rev.rdd <- parallelize(sc, rev.nums, 2L)
- actual <- collectRDD(pipeRDD(rev.rdd, "sort"))
- expected <- as.list(as.character(c(5:9, 0:4)))
- expect_equal(actual, expected)
-})
-
-test_that("zipRDD() on RDDs", {
- rdd1 <- parallelize(sc, 0:4, 2)
- rdd2 <- parallelize(sc, 1000:1004, 2)
- actual <- collectRDD(zipRDD(rdd1, rdd2))
- expect_equal(actual,
- list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004)))
-
- mockFile <- c("Spark is pretty.", "Spark is awesome.")
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName, 1)
- actual <- collectRDD(zipRDD(rdd, rdd))
- expected <- lapply(mockFile, function(x) { list(x, x) })
- expect_equal(actual, expected)
-
- rdd1 <- parallelize(sc, 0:1, 1)
- actual <- collectRDD(zipRDD(rdd1, rdd))
- expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) })
- expect_equal(actual, expected)
-
- rdd1 <- map(rdd, function(x) { x })
- actual <- collectRDD(zipRDD(rdd, rdd1))
- expected <- lapply(mockFile, function(x) { list(x, x) })
- expect_equal(actual, expected)
-
- unlink(fileName)
-})
-
-test_that("cartesian() on RDDs", {
- rdd <- parallelize(sc, 1:3)
- actual <- collectRDD(cartesian(rdd, rdd))
- expect_equal(sortKeyValueList(actual),
- list(
- list(1, 1), list(1, 2), list(1, 3),
- list(2, 1), list(2, 2), list(2, 3),
- list(3, 1), list(3, 2), list(3, 3)))
-
- # test case where one RDD is empty
- emptyRdd <- parallelize(sc, list())
- actual <- collectRDD(cartesian(rdd, emptyRdd))
- expect_equal(actual, list())
-
- mockFile <- c("Spark is pretty.", "Spark is awesome.")
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName)
- actual <- collectRDD(cartesian(rdd, rdd))
- expected <- list(
- list("Spark is awesome.", "Spark is pretty."),
- list("Spark is awesome.", "Spark is awesome."),
- list("Spark is pretty.", "Spark is pretty."),
- list("Spark is pretty.", "Spark is awesome."))
- expect_equal(sortKeyValueList(actual), expected)
-
- rdd1 <- parallelize(sc, 0:1)
- actual <- collectRDD(cartesian(rdd1, rdd))
- expect_equal(sortKeyValueList(actual),
- list(
- list(0, "Spark is pretty."),
- list(0, "Spark is awesome."),
- list(1, "Spark is pretty."),
- list(1, "Spark is awesome.")))
-
- rdd1 <- map(rdd, function(x) { x })
- actual <- collectRDD(cartesian(rdd, rdd1))
- expect_equal(sortKeyValueList(actual), expected)
-
- unlink(fileName)
-})
-
-test_that("subtract() on RDDs", {
- l <- list(1, 1, 2, 2, 3, 4)
- rdd1 <- parallelize(sc, l)
-
- # subtract by itself
- actual <- collectRDD(subtract(rdd1, rdd1))
- expect_equal(actual, list())
-
- # subtract by an empty RDD
- rdd2 <- parallelize(sc, list())
- actual <- collectRDD(subtract(rdd1, rdd2))
- expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
- l)
-
- rdd2 <- parallelize(sc, list(2, 4))
- actual <- collectRDD(subtract(rdd1, rdd2))
- expect_equal(as.list(sort(as.vector(actual, mode = "integer"))),
- list(1, 1, 3))
-
- l <- list("a", "a", "b", "b", "c", "d")
- rdd1 <- parallelize(sc, l)
- rdd2 <- parallelize(sc, list("b", "d"))
- actual <- collectRDD(subtract(rdd1, rdd2))
- expect_equal(as.list(sort(as.vector(actual, mode = "character"))),
- list("a", "a", "c"))
-})
-
-test_that("subtractByKey() on pairwise RDDs", {
- l <- list(list("a", 1), list("b", 4),
- list("b", 5), list("a", 2))
- rdd1 <- parallelize(sc, l)
-
- # subtractByKey by itself
- actual <- collectRDD(subtractByKey(rdd1, rdd1))
- expect_equal(actual, list())
-
- # subtractByKey by an empty RDD
- rdd2 <- parallelize(sc, list())
- actual <- collectRDD(subtractByKey(rdd1, rdd2))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(l))
-
- rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
- actual <- collectRDD(subtractByKey(rdd1, rdd2))
- expect_equal(actual,
- list(list("b", 4), list("b", 5)))
-
- l <- list(list(1, 1), list(2, 4),
- list(2, 5), list(1, 2))
- rdd1 <- parallelize(sc, l)
- rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
- actual <- collectRDD(subtractByKey(rdd1, rdd2))
- expect_equal(actual,
- list(list(2, 4), list(2, 5)))
-})
-
-test_that("intersection() on RDDs", {
- # intersection with self
- actual <- collectRDD(intersection(rdd, rdd))
- expect_equal(sort(as.integer(actual)), nums)
-
- # intersection with an empty RDD
- emptyRdd <- parallelize(sc, list())
- actual <- collectRDD(intersection(rdd, emptyRdd))
- expect_equal(actual, list())
-
- rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
- rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
- actual <- collectRDD(intersection(rdd1, rdd2))
- expect_equal(sort(as.integer(actual)), 1:3)
-})
-
-test_that("join() on pairwise RDDs", {
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3)))))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3)))))
-
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
- rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
- expect_equal(actual, list())
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
- rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collectRDD(joinRDD(rdd1, rdd2, 2L))
- expect_equal(actual, list())
-})
-
-test_that("leftOuterJoin() on pairwise RDDs", {
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
- rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
- rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-})
-
-test_that("rightOuterJoin() on pairwise RDDs", {
- rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
- rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3)))
- rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
- rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
- rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
-})
-
-test_that("fullOuterJoin() on pairwise RDDs", {
- rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
- rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
- actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
- list(2, list(NULL, 4)), list(3, list(3, NULL)))
- expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1)))
- rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4)))
- actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
- expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
- list("a", list(3, 1)), list("c", list(1, NULL)))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(expected))
-
- rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2)))
- rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4)))
- actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
- list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
-
- rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2)))
- rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4)))
- actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L))
- expect_equal(sortKeyValueList(actual),
- sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
- list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
-})
-
-test_that("sortByKey() on pairwise RDDs", {
- numPairsRdd <- map(rdd, function(x) { list (x, x) })
- sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
- actual <- collectRDD(sortedRdd)
- numPairs <- lapply(nums, function(x) { list (x, x) })
- expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
-
- rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L)
- numPairsRdd2 <- map(rdd2, function(x) { list (x, x) })
- sortedRdd2 <- sortByKey(numPairsRdd2)
- actual <- collectRDD(sortedRdd2)
- expect_equal(actual, numPairs)
-
- # sort by string keys
- l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5))
- rdd3 <- parallelize(sc, l, 2L)
- sortedRdd3 <- sortByKey(rdd3)
- actual <- collectRDD(sortedRdd3)
- expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
- # test on the boundary cases
-
- # boundary case 1: the RDD to be sorted has only 1 partition
- rdd4 <- parallelize(sc, l, 1L)
- sortedRdd4 <- sortByKey(rdd4)
- actual <- collectRDD(sortedRdd4)
- expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
- # boundary case 2: the sorted RDD has only 1 partition
- rdd5 <- parallelize(sc, l, 2L)
- sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L)
- actual <- collectRDD(sortedRdd5)
- expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4)))
-
- # boundary case 3: the RDD to be sorted has only 1 element
- l2 <- list(list("a", 1))
- rdd6 <- parallelize(sc, l2, 2L)
- sortedRdd6 <- sortByKey(rdd6)
- actual <- collectRDD(sortedRdd6)
- expect_equal(actual, l2)
-
- # boundary case 4: the RDD to be sorted has 0 element
- l3 <- list()
- rdd7 <- parallelize(sc, l3, 2L)
- sortedRdd7 <- sortByKey(rdd7)
- actual <- collectRDD(sortedRdd7)
- expect_equal(actual, l3)
-})
-
-test_that("collectAsMap() on a pairwise RDD", {
- rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
- vals <- collectAsMap(rdd)
- expect_equal(vals, list(`1` = 2, `3` = 4))
-
- rdd <- parallelize(sc, list(list("a", 1), list("b", 2)))
- vals <- collectAsMap(rdd)
- expect_equal(vals, list(a = 1, b = 2))
-
- rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4)))
- vals <- collectAsMap(rdd)
- expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4))
-
- rdd <- parallelize(sc, list(list(1, "a"), list(2, "b")))
- vals <- collectAsMap(rdd)
- expect_equal(vals, list(`1` = "a", `2` = "b"))
-})
-
-test_that("show()", {
- rdd <- parallelize(sc, list(1:10))
- expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
-})
-
-test_that("sampleByKey() on pairwise RDDs", {
- rdd <- parallelize(sc, 1:2000)
- pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) })
- fractions <- list(a = 0.2, b = 0.1)
- sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L)
- expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE)
- expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE)
- expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE)
- expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE)
- expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE)
- expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE)
-
- rdd <- parallelize(sc, 1:2000)
- pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) })
- fractions <- list(`2` = 0.2, `3` = 0.1)
- sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L)
- expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE)
- expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE)
- expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE)
- expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE)
- expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE)
- expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE)
-})
-
-test_that("Test correct concurrency of RRDD.compute()", {
- rdd <- parallelize(sc, 1:1000, 100)
- jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row")
- zrdd <- callJMethod(jrdd, "zip", jrdd)
- count <- callJMethod(zrdd, "count")
- expect_equal(count, 1000)
-})
-
-sparkR.session.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org