You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/06/11 07:00:40 UTC
[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic
tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R
deleted file mode 100644
index b20b431..0000000
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ /dev/null
@@ -1,167 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("Structured Streaming")
-
-# Tests for Structured Streaming functions in SparkR
-
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-
-jsonSubDir <- file.path("sparkr-test", "json", "")
-if (.Platform$OS.type == "windows") {
- # file.path removes the empty separator on Windows, adds it back
- jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
-}
-jsonDir <- file.path(tempdir(), jsonSubDir)
-dir.create(jsonDir, recursive = TRUE)
-
-mockLines <- c("{\"name\":\"Michael\"}",
- "{\"name\":\"Andy\", \"age\":30}",
- "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-writeLines(mockLines, jsonPath)
-
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
- "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
- "{\"name\":\"David\",\"age\":60,\"height\":null}")
-jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-
-schema <- structType(structField("name", "string"),
- structField("age", "integer"),
- structField("count", "double"))
-
-test_that("read.stream, write.stream, awaitTermination, stopQuery", {
- skip_on_cran()
-
- df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
- expect_true(isStreaming(df))
- counts <- count(group_by(df, "name"))
- q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
-
- expect_false(awaitTermination(q, 5 * 1000))
- callJMethod(q@ssq, "processAllAvailable")
- expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
-
- writeLines(mockLinesNa, jsonPathNa)
- awaitTermination(q, 5 * 1000)
- callJMethod(q@ssq, "processAllAvailable")
- expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
-
- stopQuery(q)
- expect_true(awaitTermination(q, 1))
- expect_error(awaitTermination(q), NA)
-})
-
-test_that("print from explain, lastProgress, status, isActive", {
- skip_on_cran()
-
- df <- read.stream("json", path = jsonDir, schema = schema)
- expect_true(isStreaming(df))
- counts <- count(group_by(df, "name"))
- q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
-
- awaitTermination(q, 5 * 1000)
- callJMethod(q@ssq, "processAllAvailable")
-
- expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
- expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
- expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
-
- expect_equal(queryName(q), "people2")
- expect_true(isActive(q))
-
- stopQuery(q)
-})
-
-test_that("Stream other format", {
- skip_on_cran()
-
- parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
- df <- read.df(jsonPath, "json", schema)
- write.df(df, parquetPath, "parquet", "overwrite")
-
- df <- read.stream(path = parquetPath, schema = schema)
- expect_true(isStreaming(df))
- counts <- count(group_by(df, "name"))
- q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
-
- expect_false(awaitTermination(q, 5 * 1000))
- callJMethod(q@ssq, "processAllAvailable")
- expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
-
- expect_equal(queryName(q), "people3")
- expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet",
- capture.output(lastProgress(q)))))
- expect_true(isActive(q))
-
- stopQuery(q)
- expect_true(awaitTermination(q, 1))
- expect_false(isActive(q))
-
- unlink(parquetPath)
-})
-
-test_that("Non-streaming DataFrame", {
- skip_on_cran()
-
- c <- as.DataFrame(cars)
- expect_false(isStreaming(c))
-
- expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"),
- paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ",
- "streaming Dataset/DataFrame).*"))
-})
-
-test_that("Unsupported operation", {
- skip_on_cran()
-
- # memory sink without aggregation
- df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
- expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
- paste0(".*(start : analysis error - Complete output mode not supported when there ",
- "are no streaming aggregations on streaming DataFrames/Datasets).*"))
-})
-
-test_that("Terminated by error", {
- skip_on_cran()
-
- df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1)
- counts <- count(group_by(df, "name"))
- # This would not fail before returning with a StreamingQuery,
- # but could dump error log at just about the same time
- expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"),
- NA)
-
- expect_error(awaitTermination(q, 5 * 1000),
- paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option",
- " 'maxFilesPerTrigger', must be a positive integer).*"))
-
- expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value",
- capture.output(status(q)))))
- expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q)))))
- expect_equal(queryName(q), "people4")
- expect_false(isActive(q))
-
- stopQuery(q)
-})
-
-unlink(jsonPath)
-unlink(jsonPathNa)
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_take.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_take.R b/R/pkg/inst/tests/testthat/test_take.R
deleted file mode 100644
index c00723b..0000000
--- a/R/pkg/inst/tests/testthat/test_take.R
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("tests RDD function take()")
-
-# Mock data
-numVector <- c(-10:97)
-numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
-strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
- "violated, but I'm not. No, in fact, I think this is a friendly",
- "message, like \"Hey, wanna play?\" and yes, I want to play. ",
- "I really, really do.")
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
- "other times it helps me control the chaos.",
- "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
- "raising me. But they're both dead now. I didn't kill them. Honest.")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-test_that("take() gives back the original elements in correct count and order", {
- skip_on_cran()
-
- numVectorRDD <- parallelize(sc, numVector, 10)
- # case: number of elements to take is less than the size of the first partition
- expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1)))
- # case: number of elements to take is the same as the size of the first partition
- expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11)))
- # case: number of elements to take is greater than all elements
- expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector))
- expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector))
-
- numListRDD <- parallelize(sc, numList, 1)
- numListRDD2 <- parallelize(sc, numList, 4)
- expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3))
- expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5))
- expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1)))
- expect_equal(takeRDD(numListRDD2, 999), numList)
-
- strVectorRDD <- parallelize(sc, strVector, 2)
- strVectorRDD2 <- parallelize(sc, strVector, 3)
- expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector))
- expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
-
- strListRDD <- parallelize(sc, strList, 4)
- strListRDD2 <- parallelize(sc, strList, 1)
- expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3)))
- expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1)))
-
- expect_equal(length(takeRDD(strListRDD, 0)), 0)
- expect_equal(length(takeRDD(strVectorRDD, 0)), 0)
- expect_equal(length(takeRDD(numListRDD, 0)), 0)
- expect_equal(length(takeRDD(numVectorRDD, 0)), 0)
-})
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_textFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_textFile.R b/R/pkg/inst/tests/testthat/test_textFile.R
deleted file mode 100644
index e8a961c..0000000
--- a/R/pkg/inst/tests/testthat/test_textFile.R
+++ /dev/null
@@ -1,182 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("the textFile() function")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("textFile() on a local file returns an RDD", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName)
- expect_is(rdd, "RDD")
- expect_true(countRDD(rdd) > 0)
- expect_equal(countRDD(rdd), 2)
-
- unlink(fileName)
-})
-
-test_that("textFile() followed by a collect() returns the same content", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName)
- expect_equal(collectRDD(rdd), as.list(mockFile))
-
- unlink(fileName)
-})
-
-test_that("textFile() word count works as expected", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName)
-
- words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
- wordCount <- lapply(words, function(word) { list(word, 1L) })
-
- counts <- reduceByKey(wordCount, "+", 2L)
- output <- collectRDD(counts)
- expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
- list("Spark", 2))
- expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
-
- unlink(fileName)
-})
-
-test_that("several transformations on RDD created by textFile()", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName) # RDD
- for (i in 1:10) {
- # PipelinedRDD initially created from RDD
- rdd <- lapply(rdd, function(x) paste(x, x))
- }
- collectRDD(rdd)
-
- unlink(fileName)
-})
-
-test_that("textFile() followed by a saveAsTextFile() returns the same content", {
- skip_on_cran()
-
- fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName1)
-
- rdd <- textFile(sc, fileName1, 1L)
- saveAsTextFile(rdd, fileName2)
- rdd <- textFile(sc, fileName2)
- expect_equal(collectRDD(rdd), as.list(mockFile))
-
- unlink(fileName1)
- unlink(fileName2)
-})
-
-test_that("saveAsTextFile() on a parallelized list works as expected", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- l <- list(1, 2, 3)
- rdd <- parallelize(sc, l, 1L)
- saveAsTextFile(rdd, fileName)
- rdd <- textFile(sc, fileName)
- expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)}))
-
- unlink(fileName)
-})
-
-test_that("textFile() and saveAsTextFile() word count works as expected", {
- skip_on_cran()
-
- fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName1)
-
- rdd <- textFile(sc, fileName1)
-
- words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
- wordCount <- lapply(words, function(word) { list(word, 1L) })
-
- counts <- reduceByKey(wordCount, "+", 2L)
-
- saveAsTextFile(counts, fileName2)
- rdd <- textFile(sc, fileName2)
-
- output <- collectRDD(rdd)
- expected <- list(list("awesome.", 1), list("Spark", 2),
- list("pretty.", 1), list("is", 2))
- expectedStr <- lapply(expected, function(x) { toString(x) })
- expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
-
- unlink(fileName1)
- unlink(fileName2)
-})
-
-test_that("textFile() on multiple paths", {
- skip_on_cran()
-
- fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines("Spark is pretty.", fileName1)
- writeLines("Spark is awesome.", fileName2)
-
- rdd <- textFile(sc, c(fileName1, fileName2))
- expect_equal(countRDD(rdd), 2)
-
- unlink(fileName1)
- unlink(fileName2)
-})
-
-test_that("Pipelined operations on RDDs created using textFile", {
- skip_on_cran()
-
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- rdd <- textFile(sc, fileName)
-
- lengths <- lapply(rdd, function(x) { length(x) })
- expect_equal(collectRDD(lengths), list(1, 1))
-
- lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
- expect_equal(collectRDD(lengthsPipelined), list(11, 11))
-
- lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
- expect_equal(collectRDD(lengths30), list(31, 31))
-
- lengths20 <- lapply(lengths, function(x) { x + 20 })
- expect_equal(collectRDD(lengths20), list(21, 21))
-
- unlink(fileName)
-})
-
-sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
deleted file mode 100644
index 6197ae7..0000000
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ /dev/null
@@ -1,248 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions in utils.R")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-test_that("convertJListToRList() gives back (deserializes) the original JLists
- of strings and integers", {
- skip_on_cran()
- # It's hard to manually create a Java List using rJava, since it does not
- # support generics well. Instead, we rely on collectRDD() returning a
- # JList.
- nums <- as.list(1:10)
- rdd <- parallelize(sc, nums, 1L)
- jList <- callJMethod(rdd@jrdd, "collect")
- rList <- convertJListToRList(jList, flatten = TRUE)
- expect_equal(rList, nums)
-
- strs <- as.list("hello", "spark")
- rdd <- parallelize(sc, strs, 2L)
- jList <- callJMethod(rdd@jrdd, "collect")
- rList <- convertJListToRList(jList, flatten = TRUE)
- expect_equal(rList, strs)
-})
-
-test_that("serializeToBytes on RDD", {
- skip_on_cran()
- # File content
- mockFile <- c("Spark is pretty.", "Spark is awesome.")
- fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
- writeLines(mockFile, fileName)
-
- text.rdd <- textFile(sc, fileName)
- expect_equal(getSerializedMode(text.rdd), "string")
- ser.rdd <- serializeToBytes(text.rdd)
- expect_equal(collectRDD(ser.rdd), as.list(mockFile))
- expect_equal(getSerializedMode(ser.rdd), "byte")
-
- unlink(fileName)
-})
-
-test_that("cleanClosure on R functions", {
- y <- c(1, 2, 3)
- g <- function(x) { x + 1 }
- f <- function(x) { g(x) + y }
- newF <- cleanClosure(f)
- env <- environment(newF)
- expect_equal(length(ls(env)), 2) # y, g
- actual <- get("y", envir = env, inherits = FALSE)
- expect_equal(actual, y)
- actual <- get("g", envir = env, inherits = FALSE)
- expect_equal(actual, g)
-
- # Test for nested enclosures and package variables.
- env2 <- new.env()
- funcEnv <- new.env(parent = env2)
- f <- function(x) { log(g(x) + y) }
- environment(f) <- funcEnv # enclosing relationship: f -> funcEnv -> env2 -> .GlobalEnv
- newF <- cleanClosure(f)
- env <- environment(newF)
- expect_equal(length(ls(env)), 2) # "min" should not be included
- actual <- get("y", envir = env, inherits = FALSE)
- expect_equal(actual, y)
- actual <- get("g", envir = env, inherits = FALSE)
- expect_equal(actual, g)
-
- base <- c(1, 2, 3)
- l <- list(field = matrix(1))
- field <- matrix(2)
- defUse <- 3
- g <- function(x) { x + y }
- f <- function(x) {
- defUse <- base::as.integer(x) + 1 # Test for access operators `::`.
- lapply(x, g) + 1 # Test for capturing function call "g"'s closure as a argument of lapply.
- l$field[1, 1] <- 3 # Test for access operators `$`.
- res <- defUse + l$field[1, ] # Test for def-use chain of "defUse", and "" symbol.
- f(res) # Test for recursive calls.
- }
- newF <- cleanClosure(f)
- env <- environment(newF)
- # TODO(shivaram): length(ls(env)) is 4 here for some reason and `lapply` is included in `env`.
- # Disabling this test till we debug this.
- #
- # nolint start
- # expect_equal(length(ls(env)), 3) # Only "g", "l" and "f". No "base", "field" or "defUse".
- # nolint end
- expect_true("g" %in% ls(env))
- expect_true("l" %in% ls(env))
- expect_true("f" %in% ls(env))
- expect_equal(get("l", envir = env, inherits = FALSE), l)
- # "y" should be in the environemnt of g.
- newG <- get("g", envir = env, inherits = FALSE)
- env <- environment(newG)
- expect_equal(length(ls(env)), 1)
- actual <- get("y", envir = env, inherits = FALSE)
- expect_equal(actual, y)
-
- # Test for function (and variable) definitions.
- f <- function(x) {
- g <- function(y) { y * 2 }
- g(x)
- }
- newF <- cleanClosure(f)
- env <- environment(newF)
- expect_equal(length(ls(env)), 0) # "y" and "g" should not be included.
-
- # Test for overriding variables in base namespace (Issue: SparkR-196).
- nums <- as.list(1:10)
- rdd <- parallelize(sc, nums, 2L)
- t <- 4 # Override base::t in .GlobalEnv.
- f <- function(x) { x > t }
- newF <- cleanClosure(f)
- env <- environment(newF)
- expect_equal(ls(env), "t")
- expect_equal(get("t", envir = env, inherits = FALSE), t)
- actual <- collectRDD(lapply(rdd, f))
- expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
- expect_equal(actual, expected)
-
- # Test for broadcast variables.
- a <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
- aBroadcast <- broadcastRDD(sc, a)
- normMultiply <- function(x) { norm(aBroadcast$value) * x }
- newnormMultiply <- SparkR:::cleanClosure(normMultiply)
- env <- environment(newnormMultiply)
- expect_equal(ls(env), "aBroadcast")
- expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
-})
-
-test_that("varargsToJProperties", {
- jprops <- newJObject("java.util.Properties")
- expect_true(class(jprops) == "jobj")
-
- jprops <- varargsToJProperties(abc = "123")
- expect_true(class(jprops) == "jobj")
- expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")
-
- jprops <- varargsToJProperties(abc = "abc", b = 1)
- expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
- expect_equal(callJMethod(jprops, "getProperty", "b"), "1")
-
- jprops <- varargsToJProperties()
- expect_equal(callJMethod(jprops, "size"), 0L)
-})
-
-test_that("convertToJSaveMode", {
- s <- convertToJSaveMode("error")
- expect_true(class(s) == "jobj")
- expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
- expect_error(convertToJSaveMode("foo"),
- 'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
-})
-
-test_that("captureJVMException", {
- skip_on_cran()
-
- method <- "createStructField"
- expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,
- "col", "unknown", TRUE),
- error = function(e) {
- captureJVMException(e, method)
- }),
- "parse error - .*DataType unknown.*not supported.")
-})
-
-test_that("hashCode", {
- skip_on_cran()
-
- expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA)
-})
-
-test_that("overrideEnvs", {
- config <- new.env()
- config[["spark.master"]] <- "foo"
- config[["config_only"]] <- "ok"
- param <- new.env()
- param[["spark.master"]] <- "local"
- param[["param_only"]] <- "blah"
- overrideEnvs(config, param)
- expect_equal(config[["spark.master"]], "local")
- expect_equal(config[["param_only"]], "blah")
- expect_equal(config[["config_only"]], "ok")
-})
-
-test_that("rbindRaws", {
-
- # Mixed Column types
- r <- serialize(1:5, connection = NULL)
- r1 <- serialize(1, connection = NULL)
- r2 <- serialize(letters, connection = NULL)
- r3 <- serialize(1:10, connection = NULL)
- inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r),
- list(3L, r3, "c", r))
- expected <- data.frame(V1 = 1:3)
- expected$V2 <- list(r1, r2, r3)
- expected$V3 <- c("a", "b", "c")
- expected$V4 <- list(r, r, r)
- result <- rbindRaws(inputData)
- expect_equal(expected, result)
-
- # Single binary column
- input <- list(list(r1), list(r2), list(r3))
- expected <- subset(expected, select = "V2")
- result <- setNames(rbindRaws(input), "V2")
- expect_equal(expected, result)
-
-})
-
-test_that("varargsToStrEnv", {
- strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd")
- env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd")
- expect_equal(strenv, env)
- expect_error(varargsToStrEnv(a = list(1, "a")),
- paste0("Unsupported type for a : list. Supported types are logical, ",
- "numeric, character and NULL."))
- expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.")
- expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.")
-})
-
-test_that("basenameSansExtFromUrl", {
- x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-",
- "SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz")
- expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7")
- z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz"
- expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive")
-})
-
-sparkR.session.stop()
-
-message("--- End test (utils) ", as.POSIXct(Sys.time(), tz = "GMT"))
-message("elapsed ", (proc.time() - timer_ptm)[3])
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/jarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/jarTest.R b/R/pkg/tests/fulltests/jarTest.R
new file mode 100644
index 0000000..e2241e0
--- /dev/null
+++ b/R/pkg/tests/fulltests/jarTest.R
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+
+sc <- sparkR.session(master = "local[1]")
+
+helloTest <- SparkR:::callJStatic("sparkrtest.DummyClass",
+ "helloWorld",
+ "Dave")
+stopifnot(identical(helloTest, "Hello Dave"))
+
+basicFunction <- SparkR:::callJStatic("sparkrtest.DummyClass",
+ "addStuff",
+ 2L,
+ 2L)
+stopifnot(basicFunction == 4L)
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/packageInAJarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/packageInAJarTest.R b/R/pkg/tests/fulltests/packageInAJarTest.R
new file mode 100644
index 0000000..ac70626
--- /dev/null
+++ b/R/pkg/tests/fulltests/packageInAJarTest.R
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+library(sparkPackageTest)
+
+sparkR.session(master = "local[1]")
+
+run1 <- myfunc(5L)
+
+run2 <- myfunc(-4L)
+
+sparkR.session.stop()
+
+if (run1 != 6) quit(save = "no", status = 1)
+
+if (run2 != -3) quit(save = "no", status = 1)
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_Serde.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R
new file mode 100644
index 0000000..6e160fa
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_Serde.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("SerDe functionality")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("SerDe of primitive types", {
+ skip_on_cran()
+
+ x <- callJStatic("SparkRHandler", "echo", 1L)
+ expect_equal(x, 1L)
+ expect_equal(class(x), "integer")
+
+ x <- callJStatic("SparkRHandler", "echo", 1)
+ expect_equal(x, 1)
+ expect_equal(class(x), "numeric")
+
+ x <- callJStatic("SparkRHandler", "echo", TRUE)
+ expect_true(x)
+ expect_equal(class(x), "logical")
+
+ x <- callJStatic("SparkRHandler", "echo", "abc")
+ expect_equal(x, "abc")
+ expect_equal(class(x), "character")
+})
+
+test_that("SerDe of list of primitive types", {
+ skip_on_cran()
+
+ x <- list(1L, 2L, 3L)
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+ expect_equal(class(y[[1]]), "integer")
+
+ x <- list(1, 2, 3)
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+ expect_equal(class(y[[1]]), "numeric")
+
+ x <- list(TRUE, FALSE)
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+ expect_equal(class(y[[1]]), "logical")
+
+ x <- list("a", "b", "c")
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+ expect_equal(class(y[[1]]), "character")
+
+ # Empty list
+ x <- list()
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+})
+
+test_that("SerDe of list of lists", {
+ skip_on_cran()
+
+ x <- list(list(1L, 2L, 3L), list(1, 2, 3),
+ list(TRUE, FALSE), list("a", "b", "c"))
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+
+ # List of empty lists
+ x <- list(list(), list())
+ y <- callJStatic("SparkRHandler", "echo", x)
+ expect_equal(x, y)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_Windows.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_Windows.R b/R/pkg/tests/fulltests/test_Windows.R
new file mode 100644
index 0000000..00d684e
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_Windows.R
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+context("Windows-specific tests")
+
+test_that("sparkJars tag in SparkContext", {
+ skip_on_cran()
+
+ if (.Platform$OS.type != "windows") {
+ skip("This test is only for Windows, skipped")
+ }
+
+ testOutput <- launchScript("ECHO", "a/b/c", wait = TRUE)
+ abcPath <- testOutput[1]
+ expect_equal(abcPath, "a\\b\\c")
+})
+
+message("--- End test (Windows) ", as.POSIXct(Sys.time(), tz = "GMT"))
+message("elapsed ", (proc.time() - timer_ptm)[3])
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_binaryFile.R b/R/pkg/tests/fulltests/test_binaryFile.R
new file mode 100644
index 0000000..00954fa
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_binaryFile.R
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions on binary files")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("saveAsObjectFile()/objectFile() following textFile() works", {
+ skip_on_cran()
+
+ fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1, 1)
+ saveAsObjectFile(rdd, fileName2)
+ rdd <- objectFile(sc, fileName2)
+ expect_equal(collectRDD(rdd), as.list(mockFile))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
+ skip_on_cran()
+
+ fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+
+ l <- list(1, 2, 3)
+ rdd <- parallelize(sc, l, 1)
+ saveAsObjectFile(rdd, fileName)
+ rdd <- objectFile(sc, fileName)
+ expect_equal(collectRDD(rdd), l)
+
+ unlink(fileName, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
+ skip_on_cran()
+
+ fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName1)
+
+ rdd <- textFile(sc, fileName1)
+
+ words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+ wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+ counts <- reduceByKey(wordCount, "+", 2L)
+
+ saveAsObjectFile(counts, fileName2)
+ counts <- objectFile(sc, fileName2)
+
+ output <- collectRDD(counts)
+ expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
+ list("is", 2))
+ expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+ unlink(fileName1)
+ unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
+ skip_on_cran()
+
+ fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+
+ rdd1 <- parallelize(sc, "Spark is pretty.")
+ saveAsObjectFile(rdd1, fileName1)
+ rdd2 <- parallelize(sc, "Spark is awesome.")
+ saveAsObjectFile(rdd2, fileName2)
+
+ rdd <- objectFile(sc, c(fileName1, fileName2))
+ expect_equal(countRDD(rdd), 2)
+
+ unlink(fileName1, recursive = TRUE)
+ unlink(fileName2, recursive = TRUE)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_binary_function.R b/R/pkg/tests/fulltests/test_binary_function.R
new file mode 100644
index 0000000..236cb38
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_binary_function.R
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("binary functions")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+# File content
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("union on two RDDs", {
+ skip_on_cran()
+
+ actual <- collectRDD(unionRDD(rdd, rdd))
+ expect_equal(actual, as.list(rep(nums, 2)))
+
+ fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName)
+
+ text.rdd <- textFile(sc, fileName)
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collectRDD(union.rdd)
+ expect_equal(actual, c(as.list(nums), mockFile))
+ expect_equal(getSerializedMode(union.rdd), "byte")
+
+ rdd <- map(text.rdd, function(x) {x})
+ union.rdd <- unionRDD(rdd, text.rdd)
+ actual <- collectRDD(union.rdd)
+ expect_equal(actual, as.list(c(mockFile, mockFile)))
+ expect_equal(getSerializedMode(union.rdd), "byte")
+
+ unlink(fileName)
+})
+
+test_that("cogroup on two RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+ rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collectRDD(cogroup.rdd)
+ expect_equal(actual,
+ list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
+
+ rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
+ rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
+ cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+ actual <- collectRDD(cogroup.rdd)
+
+ expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
+ expect_equal(sortKeyValueList(actual),
+ sortKeyValueList(expected))
+})
+
+test_that("zipPartitions() on RDDs", {
+ skip_on_cran()
+
+ rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
+ rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
+ rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
+ actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3,
+ func = function(x, y, z) { list(list(x, y, z))} ))
+ expect_equal(actual,
+ list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))
+
+ mockFile <- c("Spark is pretty.", "Spark is awesome.")
+ fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+ writeLines(mockFile, fileName)
+
+ rdd <- textFile(sc, fileName, 1)
+ actual <- collectRDD(zipPartitions(rdd, rdd,
+ func = function(x, y) { list(paste(x, y, sep = "\n")) }))
+ expected <- list(paste(mockFile, mockFile, sep = "\n"))
+ expect_equal(actual, expected)
+
+ rdd1 <- parallelize(sc, 0:1, 1)
+ actual <- collectRDD(zipPartitions(rdd1, rdd,
+ func = function(x, y) { list(x + nchar(y)) }))
+ expected <- list(0:1 + nchar(mockFile))
+ expect_equal(actual, expected)
+
+ rdd <- map(rdd, function(x) { x })
+ actual <- collectRDD(zipPartitions(rdd, rdd1,
+ func = function(x, y) { list(y + nchar(x)) }))
+ expect_equal(actual, expected)
+
+ unlink(fileName)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_broadcast.R b/R/pkg/tests/fulltests/test_broadcast.R
new file mode 100644
index 0000000..2c96740
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_broadcast.R
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("broadcast variables")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Partitioned data
+nums <- 1:2
+rrdd <- parallelize(sc, nums, 2L)
+
+test_that("using broadcast variable", {
+ skip_on_cran()
+
+ randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
+ randomMatBr <- broadcastRDD(sc, randomMat)
+
+ useBroadcast <- function(x) {
+ sum(SparkR:::value(randomMatBr) * x)
+ }
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
+
+test_that("without using broadcast variable", {
+ skip_on_cran()
+
+ randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
+
+ useBroadcast <- function(x) {
+ sum(randomMat * x)
+ }
+ actual <- collectRDD(lapply(rrdd, useBroadcast))
+ expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+ expect_equal(actual, expected)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_client.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_client.R b/R/pkg/tests/fulltests/test_client.R
new file mode 100644
index 0000000..3d53beb
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_client.R
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in client.R")
+
+test_that("adding spark-testing-base as a package works", {
+ skip_on_cran()
+
+ args <- generateSparkSubmitArgs("", "", "", "",
+ "holdenk:spark-testing-base:1.3.0_0.0.5")
+ expect_equal(gsub("[[:space:]]", "", args),
+ gsub("[[:space:]]", "",
+ "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
+})
+
+test_that("no package specified doesn't add packages flag", {
+ skip_on_cran()
+
+ args <- generateSparkSubmitArgs("", "", "", "", "")
+ expect_equal(gsub("[[:space:]]", "", args),
+ "")
+})
+
+test_that("multiple packages don't produce a warning", {
+ skip_on_cran()
+
+ expect_warning(generateSparkSubmitArgs("", "", "", "", c("A", "B")), NA)
+})
+
+test_that("sparkJars sparkPackages as character vectors", {
+ skip_on_cran()
+
+ args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
+ c("com.databricks:spark-avro_2.10:2.0.1"))
+ expect_match(args, "--jars one.jar,two.jar,three.jar")
+ expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1")
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R
new file mode 100644
index 0000000..f6d9f54
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -0,0 +1,226 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("test functions in sparkR.R")
+
+test_that("Check masked functions", {
+ skip_on_cran()
+
+ # Check that we are not masking any new function from base, stats, testthat unexpectedly
+ # NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it
+ # hard for users to use base R functions. Please check when in doubt.
+ namesOfMaskedCompletely <- c("cov", "filter", "sample", "not")
+ namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
+ "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
+ "summary", "transform", "drop", "window", "as.data.frame", "union", "not")
+ if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
+ namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
+ }
+ masked <- conflicts(detail = TRUE)$`package:SparkR`
+ expect_true("describe" %in% masked) # only when with testthat..
+ func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] })
+ funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func)
+ maskedBySparkR <- masked[funcSparkROrEmpty]
+ expect_equal(length(maskedBySparkR), length(namesOfMasked))
+ # make the 2 lists the same length so expect_equal will print their content
+ l <- max(length(maskedBySparkR), length(namesOfMasked))
+ length(maskedBySparkR) <- l
+ length(namesOfMasked) <- l
+ expect_equal(sort(maskedBySparkR, na.last = TRUE), sort(namesOfMasked, na.last = TRUE))
+ # above are those reported as masked when `library(SparkR)`
+ # note that many of these methods are still callable without base:: or stats:: prefix
+ # there should be a test for each of these, except followings, which are currently "broken"
+ funcHasAny <- unlist(lapply(masked, function(x) {
+ any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
+ }))
+ maskedCompletely <- masked[!funcHasAny]
+ expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
+ l <- max(length(maskedCompletely), length(namesOfMaskedCompletely))
+ length(maskedCompletely) <- l
+ length(namesOfMaskedCompletely) <- l
+ expect_equal(sort(maskedCompletely, na.last = TRUE),
+ sort(namesOfMaskedCompletely, na.last = TRUE))
+})
+
+test_that("repeatedly starting and stopping SparkR", {
+ skip_on_cran()
+
+ for (i in 1:4) {
+ sc <- suppressWarnings(sparkR.init(master = sparkRTestMaster))
+ rdd <- parallelize(sc, 1:20, 2L)
+ expect_equal(countRDD(rdd), 20)
+ suppressWarnings(sparkR.stop())
+ }
+})
+
+test_that("repeatedly starting and stopping SparkSession", {
+ for (i in 1:4) {
+ sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+ df <- createDataFrame(data.frame(dummy = 1:i))
+ expect_equal(count(df), i)
+ sparkR.session.stop()
+ }
+})
+
+test_that("rdd GC across sparkR.stop", {
+ skip_on_cran()
+
+ sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0
+ rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
+ rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
+ sparkR.session.stop()
+
+ sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0 again
+
+ # GC rdd1 before creating rdd3 and rdd2 after
+ rm(rdd1)
+ gc()
+
+ rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
+ rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
+
+ rm(rdd2)
+ gc()
+
+ countRDD(rdd3)
+ countRDD(rdd4)
+ sparkR.session.stop()
+})
+
+test_that("job group functions can be called", {
+ skip_on_cran()
+
+ sc <- sparkR.sparkContext(master = sparkRTestMaster)
+ setJobGroup("groupId", "job description", TRUE)
+ cancelJobGroup("groupId")
+ clearJobGroup()
+
+ suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE))
+ suppressWarnings(cancelJobGroup(sc, "groupId"))
+ suppressWarnings(clearJobGroup(sc))
+ sparkR.session.stop()
+})
+
+test_that("utility function can be called", {
+ skip_on_cran()
+
+ sparkR.sparkContext(master = sparkRTestMaster)
+ setLogLevel("ERROR")
+ sparkR.session.stop()
+})
+
+test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
+ skip_on_cran()
+
+ e <- new.env()
+ e[["spark.driver.memory"]] <- "512m"
+ ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
+ expect_equal("--driver-memory \"512m\" sparkrmain", ops)
+
+ e[["spark.driver.memory"]] <- "5g"
+ e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
+ e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
+ e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
+ e[["random"]] <- "skipthis"
+ ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
+ # nolint start
+ expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
+ "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
+ "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
+ # nolint end
+
+ e[["spark.driver.extraClassPath"]] <- "/" # too short
+ ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
+ # nolint start
+ expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
+ "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
+ " --driver-memory 4g sparkr-shell2"))
+ # nolint end
+})
+
+test_that("sparkJars sparkPackages as comma-separated strings", {
+ skip_on_cran()
+
+ expect_warning(processSparkJars(" a, b "))
+ jars <- suppressWarnings(processSparkJars(" a, b "))
+ expect_equal(lapply(jars, basename), list("a", "b"))
+
+ jars <- suppressWarnings(processSparkJars(" abc ,, def "))
+ expect_equal(lapply(jars, basename), list("abc", "def"))
+
+ jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
+ expect_equal(lapply(jars, basename), list("abc", "def", "xyz", "a", "b"))
+
+ p <- processSparkPackages(c("ghi", "lmn"))
+ expect_equal(p, c("ghi", "lmn"))
+
+ # check normalizePath
+ f <- dir()[[1]]
+ expect_warning(processSparkJars(f), NA)
+ expect_match(processSparkJars(f), f)
+})
+
+test_that("spark.lapply should perform simple transforms", {
+ sparkR.sparkContext(master = sparkRTestMaster)
+ doubled <- spark.lapply(1:10, function(x) { 2 * x })
+ expect_equal(doubled, as.list(2 * 1:10))
+ sparkR.session.stop()
+})
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+ skip_on_cran()
+
+ sparkR.sparkContext(master = sparkRTestMaster)
+ # Test add file.
+ path <- tempfile(pattern = "hello", fileext = ".txt")
+ filename <- basename(path)
+ words <- "Hello World!"
+ writeLines(words, path)
+ spark.addFile(path)
+ download_path <- spark.getSparkFiles(filename)
+ expect_equal(readLines(download_path), words)
+
+ # Test spark.getSparkFiles works well on executors.
+ seq <- seq(from = 1, to = 10, length.out = 5)
+ f <- function(seq) { spark.getSparkFiles(filename) }
+ results <- spark.lapply(seq, f)
+ for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
+
+ unlink(path)
+
+ # Test add directory recursively.
+ path <- paste0(tempdir(), "/", "recursive_dir")
+ dir.create(path)
+ dir_name <- basename(path)
+ path1 <- paste0(path, "/", "hello.txt")
+ file.create(path1)
+ sub_path <- paste0(path, "/", "sub_hello")
+ dir.create(sub_path)
+ path2 <- paste0(sub_path, "/", "sub_hello.txt")
+ file.create(path2)
+ words <- "Hello World!"
+ sub_words <- "Sub Hello World!"
+ writeLines(words, path1)
+ writeLines(sub_words, path2)
+ spark.addFile(path, recursive = TRUE)
+ download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt"))
+ expect_equal(readLines(download_path1), words)
+ download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt"))
+ expect_equal(readLines(download_path2), sub_words)
+ unlink(path, recursive = TRUE)
+ sparkR.session.stop()
+})
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_includePackage.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_includePackage.R b/R/pkg/tests/fulltests/test_includePackage.R
new file mode 100644
index 0000000..d7d9eee
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_includePackage.R
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("include R packages")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Partitioned data
+nums <- 1:2
+rdd <- parallelize(sc, nums, 2L)
+
+test_that("include inside function", {
+ skip_on_cran()
+
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ suppressPackageStartupMessages(library(plyr))
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collectRDD(data)
+ }
+})
+
+test_that("use include package", {
+ skip_on_cran()
+
+ # Only run the test if plyr is installed.
+ if ("plyr" %in% rownames(installed.packages())) {
+ suppressPackageStartupMessages(library(plyr))
+ generateData <- function(x) {
+ attach(airquality)
+ result <- transform(Ozone, logOzone = log(Ozone))
+ result
+ }
+
+ includePackage(sc, plyr)
+ data <- lapplyPartition(rdd, generateData)
+ actual <- collectRDD(data)
+ }
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_jvm_api.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_jvm_api.R b/R/pkg/tests/fulltests/test_jvm_api.R
new file mode 100644
index 0000000..8b3b4f7
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_jvm_api.R
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("JVM API")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("Create and call methods on object", {
+ jarr <- sparkR.newJObject("java.util.ArrayList")
+ # Add an element to the array
+ sparkR.callJMethod(jarr, "add", 1L)
+ # Check if get returns the same element
+ expect_equal(sparkR.callJMethod(jarr, "get", 0L), 1L)
+})
+
+test_that("Call static methods", {
+ # Convert a boolean to a string
+ strTrue <- sparkR.callJStatic("java.lang.String", "valueOf", TRUE)
+ expect_equal(strTrue, "true")
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_classification.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R
new file mode 100644
index 0000000..82e588d
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_classification.R
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib classification algorithms, except for tree-based algorithms")
+
+# Tests for MLlib classification algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+absoluteSparkPath <- function(x) {
+ sparkHome <- sparkR.conf("spark.home")
+ file.path(sparkHome, x)
+}
+
+test_that("spark.svmLinear", {
+ skip_on_cran()
+
+ df <- suppressWarnings(createDataFrame(iris))
+ training <- df[df$Species %in% c("versicolor", "virginica"), ]
+ model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10)
+ summary <- summary(model)
+
+ # test summary coefficients return matrix type
+ expect_true(class(summary$coefficients) == "matrix")
+ expect_true(class(summary$coefficients[, 1]) == "numeric")
+
+ coefs <- summary$coefficients[, "Estimate"]
+ expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085)
+ expect_true(all(abs(coefs - expected_coefs) < 0.1))
+
+ # Test prediction with string label
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
+ expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica",
+ "virginica", "virginica", "virginica", "virginica", "virginica")
+ expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)
+
+ # Test model save and load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ coefs <- summary(model)$coefficients
+ coefs2 <- summary(model2)$coefficients
+ expect_equal(coefs, coefs2)
+ unlink(modelPath)
+ }
+
+ # Test prediction with numeric label
+ label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
+ feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
+ data <- as.data.frame(cbind(label, feature))
+ df <- createDataFrame(data)
+ model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
+ prediction <- collect(select(predict(model, df), "prediction"))
+ expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
+
+})
+
+test_that("spark.logit", {
+ # R code to reproduce the result.
+ # nolint start
+ #' library(glmnet)
+ #' iris.x = as.matrix(iris[, 1:4])
+ #' iris.y = as.factor(as.character(iris[, 5]))
+ #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
+ #' coef(logit)
+ #
+ # $setosa
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # 1.0981324
+ # Sepal.Length -0.2909860
+ # Sepal.Width 0.5510907
+ # Petal.Length -0.1915217
+ # Petal.Width -0.4211946
+ #
+ # $versicolor
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # 1.520061e+00
+ # Sepal.Length 2.524501e-02
+ # Sepal.Width -5.310313e-01
+ # Petal.Length 3.656543e-02
+ # Petal.Width -3.144464e-05
+ #
+ # $virginica
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # -2.61819385
+ # Sepal.Length 0.26574097
+ # Sepal.Width -0.02005932
+ # Petal.Length 0.15495629
+ # Petal.Width 0.42122607
+ # nolint end
+
+ # Test multinomial logistic regression againt three classes
+ df <- suppressWarnings(createDataFrame(iris))
+ model <- spark.logit(df, Species ~ ., regParam = 0.5)
+ summary <- summary(model)
+
+ # test summary coefficients return matrix type
+ expect_true(class(summary$coefficients) == "matrix")
+ expect_true(class(summary$coefficients[, 1]) == "numeric")
+
+ versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
+ virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42)
+ setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42)
+ versicolorCoefs <- summary$coefficients[, "versicolor"]
+ virginicaCoefs <- summary$coefficients[, "virginica"]
+ setosaCoefs <- summary$coefficients[, "setosa"]
+ expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
+ expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
+ expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))
+
+ # Test model save and load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ coefs <- summary(model)$coefficients
+ coefs2 <- summary(model2)$coefficients
+ expect_equal(coefs, coefs2)
+ unlink(modelPath)
+ }
+
+ # R code to reproduce the result.
+ # nolint start
+ #' library(glmnet)
+ #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ]
+ #' iris.x = as.matrix(iris2[, 1:4])
+ #' iris.y = as.factor(as.character(iris2[, 5]))
+ #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
+ #' coef(logit)
+ #
+ # $versicolor
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # 3.93844796
+ # Sepal.Length -0.13538675
+ # Sepal.Width -0.02386443
+ # Petal.Length -0.35076451
+ # Petal.Width -0.77971954
+ #
+ # $virginica
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # -3.93844796
+ # Sepal.Length 0.13538675
+ # Sepal.Width 0.02386443
+ # Petal.Length 0.35076451
+ # Petal.Width 0.77971954
+ #
+ #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5)
+ #' coef(logit)
+ #
+ # 5 x 1 sparse Matrix of class "dgCMatrix"
+ # s0
+ # (Intercept) -6.0824412
+ # Sepal.Length 0.2458260
+ # Sepal.Width 0.1642093
+ # Petal.Length 0.4759487
+ # Petal.Width 1.0383948
+ #
+ # nolint end
+
+ # Test multinomial logistic regression againt two classes
+ df <- suppressWarnings(createDataFrame(iris))
+ training <- df[df$Species %in% c("versicolor", "virginica"), ]
+ model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
+ summary <- summary(model)
+ versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78)
+ virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78)
+ versicolorCoefs <- summary$coefficients[, "versicolor"]
+ virginicaCoefs <- summary$coefficients[, "virginica"]
+ expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
+ expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
+
+ # Test binomial logistic regression againt two classes
+ model <- spark.logit(training, Species ~ ., regParam = 0.5)
+ summary <- summary(model)
+ coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
+ coefs <- summary$coefficients[, "Estimate"]
+ expect_true(all(abs(coefsR - coefs) < 0.1))
+
+ # Test prediction with string label
+ prediction <- predict(model, training)
+ expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
+ expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor",
+ "versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
+ expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
+
+ # Test prediction with numeric label
+ label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
+ feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
+ data <- as.data.frame(cbind(label, feature))
+ df <- createDataFrame(data)
+ model <- spark.logit(df, label ~ feature)
+ prediction <- collect(select(predict(model, df), "prediction"))
+ expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
+
+ # Test prediction with weightCol
+ weight <- c(2.0, 2.0, 2.0, 1.0, 1.0)
+ data2 <- as.data.frame(cbind(label, feature, weight))
+ df2 <- createDataFrame(data2)
+ model2 <- spark.logit(df2, label ~ feature, weightCol = "weight")
+ prediction2 <- collect(select(predict(model2, df2), "prediction"))
+ expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
+})
+
+test_that("spark.mlp", {
+ skip_on_cran()
+
+ df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
+ source = "libsvm")
+ model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
+ solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
+
+ # Test summary method
+ summary <- summary(model)
+ expect_equal(summary$numOfInputs, 4)
+ expect_equal(summary$numOfOutputs, 3)
+ expect_equal(summary$layers, c(4, 5, 4, 3))
+ expect_equal(length(summary$weights), 64)
+ expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
+ tolerance = 1e-6)
+
+ # Test predict method
+ mlpTestDF <- df
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ summary2 <- summary(model2)
+
+ expect_equal(summary2$numOfInputs, 4)
+ expect_equal(summary2$numOfOutputs, 3)
+ expect_equal(summary2$layers, c(4, 5, 4, 3))
+ expect_equal(length(summary2$weights), 64)
+
+ unlink(modelPath)
+ }
+
+ # Test default parameter
+ model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 10),
+ c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+ # Test illegal parameter
+ expect_error(spark.mlp(df, label ~ features, layers = NULL),
+ "layers must be a integer vector with length > 1.")
+ expect_error(spark.mlp(df, label ~ features, layers = c()),
+ "layers must be a integer vector with length > 1.")
+ expect_error(spark.mlp(df, label ~ features, layers = c(3)),
+ "layers must be a integer vector with length > 1.")
+
+ # Test random seed
+ # default seed
+ model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 10),
+ c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+ # seed equals 10
+ model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 10),
+ c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+ # test initialWeights
+ model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
+ c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
+ mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+ expect_equal(head(mlpPredictions$prediction, 10),
+ c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+ # Test formula works well
+ df <- suppressWarnings(createDataFrame(iris))
+ model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
+ layers = c(4, 3))
+ summary <- summary(model)
+ expect_equal(summary$numOfInputs, 4)
+ expect_equal(summary$numOfOutputs, 3)
+ expect_equal(summary$layers, c(4, 3))
+ expect_equal(length(summary$weights), 15)
+})
+
+test_that("spark.naiveBayes", {
+ # R code to reproduce the result.
+ # We do not support instance weights yet. So we ignore the frequencies.
+ #
+ #' library(e1071)
+ #' t <- as.data.frame(Titanic)
+ #' t1 <- t[t$Freq > 0, -5]
+ #' m <- naiveBayes(Survived ~ ., data = t1)
+ #' m
+ #' predict(m, t1)
+ #
+ # -- output of 'm'
+ #
+ # A-priori probabilities:
+ # Y
+ # No Yes
+ # 0.4166667 0.5833333
+ #
+ # Conditional probabilities:
+ # Class
+ # Y 1st 2nd 3rd Crew
+ # No 0.2000000 0.2000000 0.4000000 0.2000000
+ # Yes 0.2857143 0.2857143 0.2857143 0.1428571
+ #
+ # Sex
+ # Y Male Female
+ # No 0.5 0.5
+ # Yes 0.5 0.5
+ #
+ # Age
+ # Y Child Adult
+ # No 0.2000000 0.8000000
+ # Yes 0.4285714 0.5714286
+ #
+ # -- output of 'predict(m, t1)'
+ #
+ # Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No
+ #
+
+ t <- as.data.frame(Titanic)
+ t1 <- t[t$Freq > 0, -5]
+ df <- suppressWarnings(createDataFrame(t1))
+ m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0)
+ s <- summary(m)
+ expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6)
+ expect_equal(sum(s$apriori), 1)
+ expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6)
+ p <- collect(select(predict(m, df), "prediction"))
+ expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No",
+ "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No",
+ "Yes", "Yes", "No", "No"))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
+ write.ml(m, modelPath)
+ expect_error(write.ml(m, modelPath))
+ write.ml(m, modelPath, overwrite = TRUE)
+ m2 <- read.ml(modelPath)
+ s2 <- summary(m2)
+ expect_equal(s$apriori, s2$apriori)
+ expect_equal(s$tables, s2$tables)
+
+ unlink(modelPath)
+ }
+
+ # Test e1071::naiveBayes
+ if (requireNamespace("e1071", quietly = TRUE)) {
+ expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
+ expect_equal(as.character(predict(m, t1[1, ])), "Yes")
+ }
+
+ # Test numeric response variable
+ t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
+ t2 <- t1[-4]
+ df <- suppressWarnings(createDataFrame(t2))
+ m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
+ s <- summary(m)
+ expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
+ expect_equal(sum(s$apriori), 1)
+ expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
+})
+
+sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_clustering.R b/R/pkg/tests/fulltests/test_mllib_clustering.R
new file mode 100644
index 0000000..e827e96
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_clustering.R
@@ -0,0 +1,328 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib clustering algorithms")
+
+# Tests for MLlib clustering algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+absoluteSparkPath <- function(x) {
+ sparkHome <- sparkR.conf("spark.home")
+ file.path(sparkHome, x)
+}
+
+test_that("spark.bisectingKmeans", {
+ skip_on_cran()
+
+ newIris <- iris
+ newIris$Species <- NULL
+ training <- suppressWarnings(createDataFrame(newIris))
+
+ take(training, 1)
+
+ model <- spark.bisectingKmeans(data = training, ~ .)
+ sample <- take(select(predict(model, training), "prediction"), 1)
+ expect_equal(typeof(sample$prediction), "integer")
+ expect_equal(sample$prediction, 1)
+
+ # Test fitted works on Bisecting KMeans
+ fitted.model <- fitted(model)
+ expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
+ c(0, 1, 2, 3))
+
+ # Test summary works on KMeans
+ summary.model <- summary(model)
+ cluster <- summary.model$cluster
+ k <- summary.model$k
+ expect_equal(k, 4)
+ expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
+ c(0, 1, 2, 3))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ summary2 <- summary(model2)
+ expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+ expect_equal(summary.model$coefficients, summary2$coefficients)
+ expect_true(!summary.model$is.loaded)
+ expect_true(summary2$is.loaded)
+
+ unlink(modelPath)
+ }
+})
+
+test_that("spark.gaussianMixture", {
+ # R code to reproduce the result.
+ # nolint start
+ #' library(mvtnorm)
+ #' set.seed(1)
+ #' a <- rmvnorm(7, c(0, 0))
+ #' b <- rmvnorm(8, c(10, 10))
+ #' data <- rbind(a, b)
+ #' model <- mvnormalmixEM(data, k = 2)
+ #' model$lambda
+ #
+ # [1] 0.4666667 0.5333333
+ #
+ #' model$mu
+ #
+ # [1] 0.11731091 -0.06192351
+ # [1] 10.363673 9.897081
+ #
+ #' model$sigma
+ #
+ # [[1]]
+ # [,1] [,2]
+ # [1,] 0.62049934 0.06880802
+ # [2,] 0.06880802 1.27431874
+ #
+ # [[2]]
+ # [,1] [,2]
+ # [1,] 0.2961543 0.160783
+ # [2,] 0.1607830 1.008878
+ #
+ #' model$loglik
+ #
+ # [1] -46.89499
+ # nolint end
+ data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808),
+ list(0.3295078, -0.8204684), list(0.4874291, 0.7383247),
+ list(0.5757814, -0.3053884), list(1.5117812, 0.3898432),
+ list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664),
+ list(9.9838097, 10.9438362), list(10.8212212, 10.5939013),
+ list(10.9189774, 10.7821363), list(10.0745650, 8.0106483),
+ list(10.6198257, 9.9438713), list(9.8442045, 8.5292476),
+ list(9.5218499, 10.4179416))
+ df <- createDataFrame(data, c("x1", "x2"))
+ model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2)
+ stats <- summary(model)
+ rLambda <- c(0.4666667, 0.5333333)
+ rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081)
+ rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874,
+ 0.2961543, 0.160783, 0.1607830, 1.008878)
+ rLoglik <- -46.89499
+ expect_equal(stats$lambda, rLambda, tolerance = 1e-3)
+ expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3)
+ expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3)
+ expect_equal(unlist(stats$loglik), rLoglik, tolerance = 1e-3)
+ p <- collect(select(predict(model, df), "prediction"))
+ expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+ expect_equal(stats$lambda, stats2$lambda)
+ expect_equal(unlist(stats$mu), unlist(stats2$mu))
+ expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
+ expect_equal(unlist(stats$loglik), unlist(stats2$loglik))
+
+ unlink(modelPath)
+ }
+})
+
+test_that("spark.kmeans", {
+ newIris <- iris
+ newIris$Species <- NULL
+ training <- suppressWarnings(createDataFrame(newIris))
+
+ take(training, 1)
+
+ model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random")
+ sample <- take(select(predict(model, training), "prediction"), 1)
+ expect_equal(typeof(sample$prediction), "integer")
+ expect_equal(sample$prediction, 1)
+
+ # Test stats::kmeans is working
+ statsModel <- kmeans(x = newIris, centers = 2)
+ expect_equal(sort(unique(statsModel$cluster)), c(1, 2))
+
+ # Test fitted works on KMeans
+ fitted.model <- fitted(model)
+ expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))
+
+ # Test summary works on KMeans
+ summary.model <- summary(model)
+ cluster <- summary.model$cluster
+ k <- summary.model$k
+ expect_equal(k, 2)
+ expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
+
+ # test summary coefficients return matrix type
+ expect_true(class(summary.model$coefficients) == "matrix")
+ expect_true(class(summary.model$coefficients[1, ]) == "numeric")
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ summary2 <- summary(model2)
+ expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+ expect_equal(summary.model$coefficients, summary2$coefficients)
+ expect_true(!summary.model$is.loaded)
+ expect_true(summary2$is.loaded)
+
+ unlink(modelPath)
+ }
+
+ # Test Kmeans on dataset that is sensitive to seed value
+ col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+ col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+ col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+ cols <- as.data.frame(cbind(col1, col2, col3))
+ df <- createDataFrame(cols)
+
+ model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+ initMode = "random", seed = 1, tol = 1E-5)
+ model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+ initMode = "random", seed = 22222, tol = 1E-5)
+
+ summary.model1 <- summary(model1)
+ summary.model2 <- summary(model2)
+ cluster1 <- summary.model1$cluster
+ cluster2 <- summary.model2$cluster
+ clusterSize1 <- summary.model1$clusterSize
+ clusterSize2 <- summary.model2$clusterSize
+
+ # The predicted clusters are different
+ expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
+ c(0, 1, 2, 3))
+ expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
+ c(0, 1, 2))
+ expect_equal(clusterSize1, 4)
+ expect_equal(clusterSize2, 3)
+})
+
+test_that("spark.lda with libsvm", {
+ text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm")
+ model <- spark.lda(text, optimizer = "em")
+
+ stats <- summary(model, 10)
+ isDistributed <- stats$isDistributed
+ logLikelihood <- stats$logLikelihood
+ logPerplexity <- stats$logPerplexity
+ vocabSize <- stats$vocabSize
+ topics <- stats$topicTopTerms
+ weights <- stats$topicTopTermsWeights
+ vocabulary <- stats$vocabulary
+ trainingLogLikelihood <- stats$trainingLogLikelihood
+ logPrior <- stats$logPrior
+
+ expect_true(isDistributed)
+ expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+ expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+ expect_equal(vocabSize, 11)
+ expect_true(is.null(vocabulary))
+ expect_true(trainingLogLikelihood <= 0 & !is.na(trainingLogLikelihood))
+ expect_true(logPrior <= 0 & !is.na(logPrior))
+
+ # Test model save/load
+ if (not_cran_or_windows_with_hadoop()) {
+ modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+
+ expect_true(stats2$isDistributed)
+ expect_equal(logLikelihood, stats2$logLikelihood)
+ expect_equal(logPerplexity, stats2$logPerplexity)
+ expect_equal(vocabSize, stats2$vocabSize)
+ expect_equal(vocabulary, stats2$vocabulary)
+ expect_equal(trainingLogLikelihood, stats2$trainingLogLikelihood)
+ expect_equal(logPrior, stats2$logPrior)
+
+ unlink(modelPath)
+ }
+})
+
+test_that("spark.lda with text input", {
+ skip_on_cran()
+
+ text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
+ model <- spark.lda(text, optimizer = "online", features = "value")
+
+ stats <- summary(model)
+ isDistributed <- stats$isDistributed
+ logLikelihood <- stats$logLikelihood
+ logPerplexity <- stats$logPerplexity
+ vocabSize <- stats$vocabSize
+ topics <- stats$topicTopTerms
+ weights <- stats$topicTopTermsWeights
+ vocabulary <- stats$vocabulary
+ trainingLogLikelihood <- stats$trainingLogLikelihood
+ logPrior <- stats$logPrior
+
+ expect_false(isDistributed)
+ expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+ expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+ expect_equal(vocabSize, 10)
+ expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
+ expect_true(is.na(trainingLogLikelihood))
+ expect_true(is.na(logPrior))
+
+ # Test model save/load
+ modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
+ write.ml(model, modelPath)
+ expect_error(write.ml(model, modelPath))
+ write.ml(model, modelPath, overwrite = TRUE)
+ model2 <- read.ml(modelPath)
+ stats2 <- summary(model2)
+
+ expect_false(stats2$isDistributed)
+ expect_equal(logLikelihood, stats2$logLikelihood)
+ expect_equal(logPerplexity, stats2$logPerplexity)
+ expect_equal(vocabSize, stats2$vocabSize)
+ expect_true(all.equal(vocabulary, stats2$vocabulary))
+ expect_true(is.na(stats2$trainingLogLikelihood))
+ expect_true(is.na(stats2$logPrior))
+
+ unlink(modelPath)
+})
+
+test_that("spark.posterior and spark.perplexity", {
+ skip_on_cran()
+
+ text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
+ model <- spark.lda(text, features = "value", k = 3)
+
+ # Assert perplexities are equal
+ stats <- summary(model)
+ logPerplexity <- spark.perplexity(model, text)
+ expect_equal(logPerplexity, stats$logPerplexity)
+
+ # Assert the sum of every topic distribution is equal to 1
+ posterior <- spark.posterior(model, text)
+ local.posterior <- collect(posterior)$topicDistribution
+ expect_equal(length(local.posterior), sum(unlist(local.posterior)))
+})
+
+sparkR.session.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org