You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by fe...@apache.org on 2017/06/11 07:00:40 UTC

[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R
deleted file mode 100644
index b20b431..0000000
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ /dev/null
@@ -1,167 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("Structured Streaming")
-
-# Tests for Structured Streaming functions in SparkR
-
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-
-jsonSubDir <- file.path("sparkr-test", "json", "")
-if (.Platform$OS.type == "windows") {
-  # file.path removes the empty separator on Windows, adds it back
-  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
-}
-jsonDir <- file.path(tempdir(), jsonSubDir)
-dir.create(jsonDir, recursive = TRUE)
-
-mockLines <- c("{\"name\":\"Michael\"}",
-               "{\"name\":\"Andy\", \"age\":30}",
-               "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-writeLines(mockLines, jsonPath)
-
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
-                 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
-                 "{\"name\":\"David\",\"age\":60,\"height\":null}")
-jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-
-schema <- structType(structField("name", "string"),
-                     structField("age", "integer"),
-                     structField("count", "double"))
-
-test_that("read.stream, write.stream, awaitTermination, stopQuery", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
-
-  writeLines(mockLinesNa, jsonPathNa)
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_error(awaitTermination(q), NA)
-})
-
-test_that("print from explain, lastProgress, status, isActive", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
-
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-
-  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
-  expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
-  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
-
-  expect_equal(queryName(q), "people2")
-  expect_true(isActive(q))
-
-  stopQuery(q)
-})
-
-test_that("Stream other format", {
-  skip_on_cran()
-
-  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
-  df <- read.df(jsonPath, "json", schema)
-  write.df(df, parquetPath, "parquet", "overwrite")
-
-  df <- read.stream(path = parquetPath, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
-
-  expect_equal(queryName(q), "people3")
-  expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet",
-              capture.output(lastProgress(q)))))
-  expect_true(isActive(q))
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_false(isActive(q))
-
-  unlink(parquetPath)
-})
-
-test_that("Non-streaming DataFrame", {
-  skip_on_cran()
-
-  c <- as.DataFrame(cars)
-  expect_false(isStreaming(c))
-
-  expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"),
-               paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ",
-                      "streaming Dataset/DataFrame).*"))
-})
-
-test_that("Unsupported operation", {
-  skip_on_cran()
-
-  # memory sink without aggregation
-  df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
-  expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
-               paste0(".*(start : analysis error - Complete output mode not supported when there ",
-                      "are no streaming aggregations on streaming DataFrames/Datasets).*"))
-})
-
-test_that("Terminated by error", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1)
-  counts <- count(group_by(df, "name"))
-  # This would not fail before returning with a StreamingQuery,
-  # but could dump error log at just about the same time
-  expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"),
-               NA)
-
-  expect_error(awaitTermination(q, 5 * 1000),
-               paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option",
-                      " 'maxFilesPerTrigger', must be a positive integer).*"))
-
-  expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value",
-              capture.output(status(q)))))
-  expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q)))))
-  expect_equal(queryName(q), "people4")
-  expect_false(isActive(q))
-
-  stopQuery(q)
-})
-
-unlink(jsonPath)
-unlink(jsonPathNa)
-
-sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_take.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_take.R b/R/pkg/inst/tests/testthat/test_take.R
deleted file mode 100644
index c00723b..0000000
--- a/R/pkg/inst/tests/testthat/test_take.R
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("tests RDD function take()")
-
-# Mock data
-numVector <- c(-10:97)
-numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
-strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
-               "violated, but I'm not. No, in fact, I think this is a friendly",
-               "message, like \"Hey, wanna play?\" and yes, I want to play. ",
-               "I really, really do.")
-strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
-                "other times it helps me control the chaos.",
-                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
-                "raising me. But they're both dead now. I didn't kill them. Honest.")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-test_that("take() gives back the original elements in correct count and order", {
-  skip_on_cran()
-
-  numVectorRDD <- parallelize(sc, numVector, 10)
-  # case: number of elements to take is less than the size of the first partition
-  expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1)))
-  # case: number of elements to take is the same as the size of the first partition
-  expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11)))
-  # case: number of elements to take is greater than all elements
-  expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector))
-  expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector))
-
-  numListRDD <- parallelize(sc, numList, 1)
-  numListRDD2 <- parallelize(sc, numList, 4)
-  expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3))
-  expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5))
-  expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1)))
-  expect_equal(takeRDD(numListRDD2, 999), numList)
-
-  strVectorRDD <- parallelize(sc, strVector, 2)
-  strVectorRDD2 <- parallelize(sc, strVector, 3)
-  expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector))
-  expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
-
-  strListRDD <- parallelize(sc, strList, 4)
-  strListRDD2 <- parallelize(sc, strList, 1)
-  expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3)))
-  expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1)))
-
-  expect_equal(length(takeRDD(strListRDD, 0)), 0)
-  expect_equal(length(takeRDD(strVectorRDD, 0)), 0)
-  expect_equal(length(takeRDD(numListRDD, 0)), 0)
-  expect_equal(length(takeRDD(numVectorRDD, 0)), 0)
-})
-
-sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_textFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_textFile.R b/R/pkg/inst/tests/testthat/test_textFile.R
deleted file mode 100644
index e8a961c..0000000
--- a/R/pkg/inst/tests/testthat/test_textFile.R
+++ /dev/null
@@ -1,182 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("the textFile() function")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-mockFile <- c("Spark is pretty.", "Spark is awesome.")
-
-test_that("textFile() on a local file returns an RDD", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName)
-  expect_is(rdd, "RDD")
-  expect_true(countRDD(rdd) > 0)
-  expect_equal(countRDD(rdd), 2)
-
-  unlink(fileName)
-})
-
-test_that("textFile() followed by a collect() returns the same content", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName)
-  expect_equal(collectRDD(rdd), as.list(mockFile))
-
-  unlink(fileName)
-})
-
-test_that("textFile() word count works as expected", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName)
-
-  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
-  wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-  counts <- reduceByKey(wordCount, "+", 2L)
-  output <- collectRDD(counts)
-  expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
-                   list("Spark", 2))
-  expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
-
-  unlink(fileName)
-})
-
-test_that("several transformations on RDD created by textFile()", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName) # RDD
-  for (i in 1:10) {
-    # PipelinedRDD initially created from RDD
-    rdd <- lapply(rdd, function(x) paste(x, x))
-  }
-  collectRDD(rdd)
-
-  unlink(fileName)
-})
-
-test_that("textFile() followed by a saveAsTextFile() returns the same content", {
-  skip_on_cran()
-
-  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName1)
-
-  rdd <- textFile(sc, fileName1, 1L)
-  saveAsTextFile(rdd, fileName2)
-  rdd <- textFile(sc, fileName2)
-  expect_equal(collectRDD(rdd), as.list(mockFile))
-
-  unlink(fileName1)
-  unlink(fileName2)
-})
-
-test_that("saveAsTextFile() on a parallelized list works as expected", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  l <- list(1, 2, 3)
-  rdd <- parallelize(sc, l, 1L)
-  saveAsTextFile(rdd, fileName)
-  rdd <- textFile(sc, fileName)
-  expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)}))
-
-  unlink(fileName)
-})
-
-test_that("textFile() and saveAsTextFile() word count works as expected", {
-  skip_on_cran()
-
-  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName1)
-
-  rdd <- textFile(sc, fileName1)
-
-  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
-  wordCount <- lapply(words, function(word) { list(word, 1L) })
-
-  counts <- reduceByKey(wordCount, "+", 2L)
-
-  saveAsTextFile(counts, fileName2)
-  rdd <- textFile(sc, fileName2)
-
-  output <- collectRDD(rdd)
-  expected <- list(list("awesome.", 1), list("Spark", 2),
-                   list("pretty.", 1), list("is", 2))
-  expectedStr <- lapply(expected, function(x) { toString(x) })
-  expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
-
-  unlink(fileName1)
-  unlink(fileName2)
-})
-
-test_that("textFile() on multiple paths", {
-  skip_on_cran()
-
-  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines("Spark is pretty.", fileName1)
-  writeLines("Spark is awesome.", fileName2)
-
-  rdd <- textFile(sc, c(fileName1, fileName2))
-  expect_equal(countRDD(rdd), 2)
-
-  unlink(fileName1)
-  unlink(fileName2)
-})
-
-test_that("Pipelined operations on RDDs created using textFile", {
-  skip_on_cran()
-
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  rdd <- textFile(sc, fileName)
-
-  lengths <- lapply(rdd, function(x) { length(x) })
-  expect_equal(collectRDD(lengths), list(1, 1))
-
-  lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
-  expect_equal(collectRDD(lengthsPipelined), list(11, 11))
-
-  lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
-  expect_equal(collectRDD(lengths30), list(31, 31))
-
-  lengths20 <- lapply(lengths, function(x) { x + 20 })
-  expect_equal(collectRDD(lengths20), list(21, 21))
-
-  unlink(fileName)
-})
-
-sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R
deleted file mode 100644
index 6197ae7..0000000
--- a/R/pkg/inst/tests/testthat/test_utils.R
+++ /dev/null
@@ -1,248 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-context("functions in utils.R")
-
-# JavaSparkContext handle
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
-
-test_that("convertJListToRList() gives back (deserializes) the original JLists
-          of strings and integers", {
-  skip_on_cran()
-  # It's hard to manually create a Java List using rJava, since it does not
-  # support generics well. Instead, we rely on collectRDD() returning a
-  # JList.
-  nums <- as.list(1:10)
-  rdd <- parallelize(sc, nums, 1L)
-  jList <- callJMethod(rdd@jrdd, "collect")
-  rList <- convertJListToRList(jList, flatten = TRUE)
-  expect_equal(rList, nums)
-
-  strs <- as.list("hello", "spark")
-  rdd <- parallelize(sc, strs, 2L)
-  jList <- callJMethod(rdd@jrdd, "collect")
-  rList <- convertJListToRList(jList, flatten = TRUE)
-  expect_equal(rList, strs)
-})
-
-test_that("serializeToBytes on RDD", {
-  skip_on_cran()
-  # File content
-  mockFile <- c("Spark is pretty.", "Spark is awesome.")
-  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
-  writeLines(mockFile, fileName)
-
-  text.rdd <- textFile(sc, fileName)
-  expect_equal(getSerializedMode(text.rdd), "string")
-  ser.rdd <- serializeToBytes(text.rdd)
-  expect_equal(collectRDD(ser.rdd), as.list(mockFile))
-  expect_equal(getSerializedMode(ser.rdd), "byte")
-
-  unlink(fileName)
-})
-
-test_that("cleanClosure on R functions", {
-  y <- c(1, 2, 3)
-  g <- function(x) { x + 1 }
-  f <- function(x) { g(x) + y }
-  newF <- cleanClosure(f)
-  env <- environment(newF)
-  expect_equal(length(ls(env)), 2)  # y, g
-  actual <- get("y", envir = env, inherits = FALSE)
-  expect_equal(actual, y)
-  actual <- get("g", envir = env, inherits = FALSE)
-  expect_equal(actual, g)
-
-  # Test for nested enclosures and package variables.
-  env2 <- new.env()
-  funcEnv <- new.env(parent = env2)
-  f <- function(x) { log(g(x) + y) }
-  environment(f) <- funcEnv  # enclosing relationship: f -> funcEnv -> env2 -> .GlobalEnv
-  newF <- cleanClosure(f)
-  env <- environment(newF)
-  expect_equal(length(ls(env)), 2)  # "min" should not be included
-  actual <- get("y", envir = env, inherits = FALSE)
-  expect_equal(actual, y)
-  actual <- get("g", envir = env, inherits = FALSE)
-  expect_equal(actual, g)
-
-  base <- c(1, 2, 3)
-  l <- list(field = matrix(1))
-  field <- matrix(2)
-  defUse <- 3
-  g <- function(x) { x + y }
-  f <- function(x) {
-    defUse <- base::as.integer(x) + 1  # Test for access operators `::`.
-    lapply(x, g) + 1  # Test for capturing function call "g"'s closure as a argument of lapply.
-    l$field[1, 1] <- 3  # Test for access operators `$`.
-    res <- defUse + l$field[1, ]  # Test for def-use chain of "defUse", and "" symbol.
-    f(res)  # Test for recursive calls.
-  }
-  newF <- cleanClosure(f)
-  env <- environment(newF)
-  # TODO(shivaram): length(ls(env)) is 4 here for some reason and `lapply` is included in `env`.
-  # Disabling this test till we debug this.
-  #
-  # nolint start
-  # expect_equal(length(ls(env)), 3)  # Only "g", "l" and "f". No "base", "field" or "defUse".
-  # nolint end
-  expect_true("g" %in% ls(env))
-  expect_true("l" %in% ls(env))
-  expect_true("f" %in% ls(env))
-  expect_equal(get("l", envir = env, inherits = FALSE), l)
-  # "y" should be in the environemnt of g.
-  newG <- get("g", envir = env, inherits = FALSE)
-  env <- environment(newG)
-  expect_equal(length(ls(env)), 1)
-  actual <- get("y", envir = env, inherits = FALSE)
-  expect_equal(actual, y)
-
-  # Test for function (and variable) definitions.
-  f <- function(x) {
-    g <- function(y) { y * 2 }
-    g(x)
-  }
-  newF <- cleanClosure(f)
-  env <- environment(newF)
-  expect_equal(length(ls(env)), 0)  # "y" and "g" should not be included.
-
-  # Test for overriding variables in base namespace (Issue: SparkR-196).
-  nums <- as.list(1:10)
-  rdd <- parallelize(sc, nums, 2L)
-  t <- 4  # Override base::t in .GlobalEnv.
-  f <- function(x) { x > t }
-  newF <- cleanClosure(f)
-  env <- environment(newF)
-  expect_equal(ls(env), "t")
-  expect_equal(get("t", envir = env, inherits = FALSE), t)
-  actual <- collectRDD(lapply(rdd, f))
-  expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
-  expect_equal(actual, expected)
-
-  # Test for broadcast variables.
-  a <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
-  aBroadcast <- broadcastRDD(sc, a)
-  normMultiply <- function(x) { norm(aBroadcast$value) * x }
-  newnormMultiply <- SparkR:::cleanClosure(normMultiply)
-  env <- environment(newnormMultiply)
-  expect_equal(ls(env), "aBroadcast")
-  expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
-})
-
-test_that("varargsToJProperties", {
-  jprops <- newJObject("java.util.Properties")
-  expect_true(class(jprops) == "jobj")
-
-  jprops <- varargsToJProperties(abc = "123")
-  expect_true(class(jprops) == "jobj")
-  expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")
-
-  jprops <- varargsToJProperties(abc = "abc", b = 1)
-  expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
-  expect_equal(callJMethod(jprops, "getProperty", "b"), "1")
-
-  jprops <- varargsToJProperties()
-  expect_equal(callJMethod(jprops, "size"), 0L)
-})
-
-test_that("convertToJSaveMode", {
-  s <- convertToJSaveMode("error")
-  expect_true(class(s) == "jobj")
-  expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
-  expect_error(convertToJSaveMode("foo"),
-    'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
-})
-
-test_that("captureJVMException", {
-  skip_on_cran()
-
-  method <- "createStructField"
-  expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,
-                                    "col", "unknown", TRUE),
-                        error = function(e) {
-                          captureJVMException(e, method)
-                        }),
-               "parse error - .*DataType unknown.*not supported.")
-})
-
-test_that("hashCode", {
-  skip_on_cran()
-
-  expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA)
-})
-
-test_that("overrideEnvs", {
-  config <- new.env()
-  config[["spark.master"]] <- "foo"
-  config[["config_only"]] <- "ok"
-  param <- new.env()
-  param[["spark.master"]] <- "local"
-  param[["param_only"]] <- "blah"
-  overrideEnvs(config, param)
-  expect_equal(config[["spark.master"]], "local")
-  expect_equal(config[["param_only"]], "blah")
-  expect_equal(config[["config_only"]], "ok")
-})
-
-test_that("rbindRaws", {
-
-  # Mixed Column types
-  r <- serialize(1:5, connection = NULL)
-  r1 <- serialize(1, connection = NULL)
-  r2 <- serialize(letters, connection = NULL)
-  r3 <- serialize(1:10, connection = NULL)
-  inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r),
-                    list(3L, r3, "c", r))
-  expected <- data.frame(V1 = 1:3)
-  expected$V2 <- list(r1, r2, r3)
-  expected$V3 <- c("a", "b", "c")
-  expected$V4 <- list(r, r, r)
-  result <- rbindRaws(inputData)
-  expect_equal(expected, result)
-
-  # Single binary column
-  input <- list(list(r1), list(r2), list(r3))
-  expected <- subset(expected, select = "V2")
-  result <- setNames(rbindRaws(input), "V2")
-  expect_equal(expected, result)
-
-})
-
-test_that("varargsToStrEnv", {
-  strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd")
-  env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd")
-  expect_equal(strenv, env)
-  expect_error(varargsToStrEnv(a = list(1, "a")),
-               paste0("Unsupported type for a : list. Supported types are logical, ",
-                      "numeric, character and NULL."))
-  expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.")
-  expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.")
-})
-
-test_that("basenameSansExtFromUrl", {
-  x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-",
-              "SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz")
-  expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7")
-  z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz"
-  expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive")
-})
-
-sparkR.session.stop()
-
-message("--- End test (utils) ", as.POSIXct(Sys.time(), tz = "GMT"))
-message("elapsed ", (proc.time() - timer_ptm)[3])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/jarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/jarTest.R b/R/pkg/tests/fulltests/jarTest.R
new file mode 100644
index 0000000..e2241e0
--- /dev/null
+++ b/R/pkg/tests/fulltests/jarTest.R
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+
+sc <- sparkR.session(master = "local[1]")
+
+helloTest <- SparkR:::callJStatic("sparkrtest.DummyClass",
+                                  "helloWorld",
+                                  "Dave")
+stopifnot(identical(helloTest, "Hello Dave"))
+
+basicFunction <- SparkR:::callJStatic("sparkrtest.DummyClass",
+                                      "addStuff",
+                                      2L,
+                                      2L)
+stopifnot(basicFunction == 4L)
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/packageInAJarTest.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/packageInAJarTest.R b/R/pkg/tests/fulltests/packageInAJarTest.R
new file mode 100644
index 0000000..ac70626
--- /dev/null
+++ b/R/pkg/tests/fulltests/packageInAJarTest.R
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+library(SparkR)
+library(sparkPackageTest)
+
+sparkR.session(master = "local[1]")
+
+run1 <- myfunc(5L)
+
+run2 <- myfunc(-4L)
+
+sparkR.session.stop()
+
+if (run1 != 6) quit(save = "no", status = 1)
+
+if (run2 != -3) quit(save = "no", status = 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_Serde.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R
new file mode 100644
index 0000000..6e160fa
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_Serde.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("SerDe functionality")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("SerDe of primitive types", {
+  skip_on_cran()
+
+  x <- callJStatic("SparkRHandler", "echo", 1L)
+  expect_equal(x, 1L)
+  expect_equal(class(x), "integer")
+
+  x <- callJStatic("SparkRHandler", "echo", 1)
+  expect_equal(x, 1)
+  expect_equal(class(x), "numeric")
+
+  x <- callJStatic("SparkRHandler", "echo", TRUE)
+  expect_true(x)
+  expect_equal(class(x), "logical")
+
+  x <- callJStatic("SparkRHandler", "echo", "abc")
+  expect_equal(x, "abc")
+  expect_equal(class(x), "character")
+})
+
+test_that("SerDe of list of primitive types", {
+  skip_on_cran()
+
+  x <- list(1L, 2L, 3L)
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+  expect_equal(class(y[[1]]), "integer")
+
+  x <- list(1, 2, 3)
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+  expect_equal(class(y[[1]]), "numeric")
+
+  x <- list(TRUE, FALSE)
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+  expect_equal(class(y[[1]]), "logical")
+
+  x <- list("a", "b", "c")
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+  expect_equal(class(y[[1]]), "character")
+
+  # Empty list
+  x <- list()
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+})
+
+test_that("SerDe of list of lists", {
+  skip_on_cran()
+
+  x <- list(list(1L, 2L, 3L), list(1, 2, 3),
+            list(TRUE, FALSE), list("a", "b", "c"))
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+
+  # List of empty lists
+  x <- list(list(), list())
+  y <- callJStatic("SparkRHandler", "echo", x)
+  expect_equal(x, y)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_Windows.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_Windows.R b/R/pkg/tests/fulltests/test_Windows.R
new file mode 100644
index 0000000..00d684e
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_Windows.R
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+context("Windows-specific tests")
+
+test_that("sparkJars tag in SparkContext", {
+  skip_on_cran()
+
+  if (.Platform$OS.type != "windows") {
+    skip("This test is only for Windows, skipped")
+  }
+
+  testOutput <- launchScript("ECHO", "a/b/c", wait = TRUE)
+  abcPath <- testOutput[1]
+  expect_equal(abcPath, "a\\b\\c")
+})
+
+message("--- End test (Windows) ", as.POSIXct(Sys.time(), tz = "GMT"))
+message("elapsed ", (proc.time() - timer_ptm)[3])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_binaryFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_binaryFile.R b/R/pkg/tests/fulltests/test_binaryFile.R
new file mode 100644
index 0000000..00954fa
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_binaryFile.R
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions on binary files")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("saveAsObjectFile()/objectFile() following textFile() works", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1, 1)
+  saveAsObjectFile(rdd, fileName2)
+  rdd <- objectFile(sc, fileName2)
+  expect_equal(collectRDD(rdd), as.list(mockFile))
+
+  unlink(fileName1)
+  unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+
+  l <- list(1, 2, 3)
+  rdd <- parallelize(sc, l, 1)
+  saveAsObjectFile(rdd, fileName)
+  rdd <- objectFile(sc, fileName)
+  expect_equal(collectRDD(rdd), l)
+
+  unlink(fileName, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() following RDD transformations works", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1)
+
+  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+  wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+  counts <- reduceByKey(wordCount, "+", 2L)
+
+  saveAsObjectFile(counts, fileName2)
+  counts <- objectFile(sc, fileName2)
+
+  output <- collectRDD(counts)
+  expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
+                    list("is", 2))
+  expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+  unlink(fileName1)
+  unlink(fileName2, recursive = TRUE)
+})
+
+test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+
+  rdd1 <- parallelize(sc, "Spark is pretty.")
+  saveAsObjectFile(rdd1, fileName1)
+  rdd2 <- parallelize(sc, "Spark is awesome.")
+  saveAsObjectFile(rdd2, fileName2)
+
+  rdd <- objectFile(sc, c(fileName1, fileName2))
+  expect_equal(countRDD(rdd), 2)
+
+  unlink(fileName1, recursive = TRUE)
+  unlink(fileName2, recursive = TRUE)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_binary_function.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_binary_function.R b/R/pkg/tests/fulltests/test_binary_function.R
new file mode 100644
index 0000000..236cb38
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_binary_function.R
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("binary functions")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Data
+nums <- 1:10
+rdd <- parallelize(sc, nums, 2L)
+
+# File content
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("union on two RDDs", {
+  skip_on_cran()
+
+  actual <- collectRDD(unionRDD(rdd, rdd))
+  expect_equal(actual, as.list(rep(nums, 2)))
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  text.rdd <- textFile(sc, fileName)
+  union.rdd <- unionRDD(rdd, text.rdd)
+  actual <- collectRDD(union.rdd)
+  expect_equal(actual, c(as.list(nums), mockFile))
+  expect_equal(getSerializedMode(union.rdd), "byte")
+
+  rdd <- map(text.rdd, function(x) {x})
+  union.rdd <- unionRDD(rdd, text.rdd)
+  actual <- collectRDD(union.rdd)
+  expect_equal(actual, as.list(c(mockFile, mockFile)))
+  expect_equal(getSerializedMode(union.rdd), "byte")
+
+  unlink(fileName)
+})
+
+test_that("cogroup on two RDDs", {
+  skip_on_cran()
+
+  rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
+  rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
+  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+  actual <- collectRDD(cogroup.rdd)
+  expect_equal(actual,
+               list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
+
+  rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
+  rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
+  cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
+  actual <- collectRDD(cogroup.rdd)
+
+  expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
+  expect_equal(sortKeyValueList(actual),
+               sortKeyValueList(expected))
+})
+
+test_that("zipPartitions() on RDDs", {
+  skip_on_cran()
+
+  rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
+  rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
+  rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
+  actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3,
+                                  func = function(x, y, z) { list(list(x, y, z))} ))
+  expect_equal(actual,
+               list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6))))
+
+  mockFile <- c("Spark is pretty.", "Spark is awesome.")
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName, 1)
+  actual <- collectRDD(zipPartitions(rdd, rdd,
+                                  func = function(x, y) { list(paste(x, y, sep = "\n")) }))
+  expected <- list(paste(mockFile, mockFile, sep = "\n"))
+  expect_equal(actual, expected)
+
+  rdd1 <- parallelize(sc, 0:1, 1)
+  actual <- collectRDD(zipPartitions(rdd1, rdd,
+                                  func = function(x, y) { list(x + nchar(y)) }))
+  expected <- list(0:1 + nchar(mockFile))
+  expect_equal(actual, expected)
+
+  rdd <- map(rdd, function(x) { x })
+  actual <- collectRDD(zipPartitions(rdd, rdd1,
+                                  func = function(x, y) { list(y + nchar(x)) }))
+  expect_equal(actual, expected)
+
+  unlink(fileName)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_broadcast.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_broadcast.R b/R/pkg/tests/fulltests/test_broadcast.R
new file mode 100644
index 0000000..2c96740
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_broadcast.R
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("broadcast variables")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Partitioned data
+nums <- 1:2
+rrdd <- parallelize(sc, nums, 2L)
+
+test_that("using broadcast variable", {
+  skip_on_cran()
+
+  randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
+  randomMatBr <- broadcastRDD(sc, randomMat)
+
+  useBroadcast <- function(x) {
+    sum(SparkR:::value(randomMatBr) * x)
+  }
+  actual <- collectRDD(lapply(rrdd, useBroadcast))
+  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+  expect_equal(actual, expected)
+})
+
+test_that("without using broadcast variable", {
+  skip_on_cran()
+
+  randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
+
+  useBroadcast <- function(x) {
+    sum(randomMat * x)
+  }
+  actual <- collectRDD(lapply(rrdd, useBroadcast))
+  expected <- list(sum(randomMat) * 1, sum(randomMat) * 2)
+  expect_equal(actual, expected)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_client.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_client.R b/R/pkg/tests/fulltests/test_client.R
new file mode 100644
index 0000000..3d53beb
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_client.R
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in client.R")
+
+test_that("adding spark-testing-base as a package works", {
+  skip_on_cran()
+
+  args <- generateSparkSubmitArgs("", "", "", "",
+                                  "holdenk:spark-testing-base:1.3.0_0.0.5")
+  expect_equal(gsub("[[:space:]]", "", args),
+               gsub("[[:space:]]", "",
+                    "--packages holdenk:spark-testing-base:1.3.0_0.0.5"))
+})
+
+test_that("no package specified doesn't add packages flag", {
+  skip_on_cran()
+
+  args <- generateSparkSubmitArgs("", "", "", "", "")
+  expect_equal(gsub("[[:space:]]", "", args),
+               "")
+})
+
+test_that("multiple packages don't produce a warning", {
+  skip_on_cran()
+
+  expect_warning(generateSparkSubmitArgs("", "", "", "", c("A", "B")), NA)
+})
+
+test_that("sparkJars sparkPackages as character vectors", {
+  skip_on_cran()
+
+  args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "",
+                                  c("com.databricks:spark-avro_2.10:2.0.1"))
+  expect_match(args, "--jars one.jar,two.jar,three.jar")
+  expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1")
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R
new file mode 100644
index 0000000..f6d9f54
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_context.R
@@ -0,0 +1,226 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("test functions in sparkR.R")
+
+test_that("Check masked functions", {
+  skip_on_cran()
+
+  # Check that we are not masking any new function from base, stats, testthat unexpectedly
+  # NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it
+  # hard for users to use base R functions. Please check when in doubt.
+  namesOfMaskedCompletely <- c("cov", "filter", "sample", "not")
+  namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
+                     "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
+                     "summary", "transform", "drop", "window", "as.data.frame", "union", "not")
+  if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
+    namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
+  }
+  masked <- conflicts(detail = TRUE)$`package:SparkR`
+  expect_true("describe" %in% masked)  # only when with testthat..
+  func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] })
+  funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func)
+  maskedBySparkR <- masked[funcSparkROrEmpty]
+  expect_equal(length(maskedBySparkR), length(namesOfMasked))
+  # make the 2 lists the same length so expect_equal will print their content
+  l <- max(length(maskedBySparkR), length(namesOfMasked))
+  length(maskedBySparkR) <- l
+  length(namesOfMasked) <- l
+  expect_equal(sort(maskedBySparkR, na.last = TRUE), sort(namesOfMasked, na.last = TRUE))
+  # above are those reported as masked when `library(SparkR)`
+  # note that many of these methods are still callable without base:: or stats:: prefix
+  # there should be a test for each of these, except followings, which are currently "broken"
+  funcHasAny <- unlist(lapply(masked, function(x) {
+                                        any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1])))
+                                      }))
+  maskedCompletely <- masked[!funcHasAny]
+  expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely))
+  l <- max(length(maskedCompletely), length(namesOfMaskedCompletely))
+  length(maskedCompletely) <- l
+  length(namesOfMaskedCompletely) <- l
+  expect_equal(sort(maskedCompletely, na.last = TRUE),
+               sort(namesOfMaskedCompletely, na.last = TRUE))
+})
+
+test_that("repeatedly starting and stopping SparkR", {
+  skip_on_cran()
+
+  for (i in 1:4) {
+    sc <- suppressWarnings(sparkR.init(master = sparkRTestMaster))
+    rdd <- parallelize(sc, 1:20, 2L)
+    expect_equal(countRDD(rdd), 20)
+    suppressWarnings(sparkR.stop())
+  }
+})
+
+test_that("repeatedly starting and stopping SparkSession", {
+  for (i in 1:4) {
+    sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+    df <- createDataFrame(data.frame(dummy = 1:i))
+    expect_equal(count(df), i)
+    sparkR.session.stop()
+  }
+})
+
+test_that("rdd GC across sparkR.stop", {
+  skip_on_cran()
+
+  sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0
+  rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
+  rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2
+  sparkR.session.stop()
+
+  sc <- sparkR.sparkContext(master = sparkRTestMaster) # sc should get id 0 again
+
+  # GC rdd1 before creating rdd3 and rdd2 after
+  rm(rdd1)
+  gc()
+
+  rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now
+  rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now
+
+  rm(rdd2)
+  gc()
+
+  countRDD(rdd3)
+  countRDD(rdd4)
+  sparkR.session.stop()
+})
+
+test_that("job group functions can be called", {
+  skip_on_cran()
+
+  sc <- sparkR.sparkContext(master = sparkRTestMaster)
+  setJobGroup("groupId", "job description", TRUE)
+  cancelJobGroup("groupId")
+  clearJobGroup()
+
+  suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE))
+  suppressWarnings(cancelJobGroup(sc, "groupId"))
+  suppressWarnings(clearJobGroup(sc))
+  sparkR.session.stop()
+})
+
+test_that("utility function can be called", {
+  skip_on_cran()
+
+  sparkR.sparkContext(master = sparkRTestMaster)
+  setLogLevel("ERROR")
+  sparkR.session.stop()
+})
+
+test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", {
+  skip_on_cran()
+
+  e <- new.env()
+  e[["spark.driver.memory"]] <- "512m"
+  ops <- getClientModeSparkSubmitOpts("sparkrmain", e)
+  expect_equal("--driver-memory \"512m\" sparkrmain", ops)
+
+  e[["spark.driver.memory"]] <- "5g"
+  e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint
+  e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings"
+  e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint
+  e[["random"]] <- "skipthis"
+  ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e)
+  # nolint start
+  expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"",
+                      "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"",
+                      "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell"))
+  # nolint end
+
+  e[["spark.driver.extraClassPath"]] <- "/" # too short
+  ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e)
+  # nolint start
+  expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ",
+                      "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"",
+                      " --driver-memory 4g sparkr-shell2"))
+  # nolint end
+})
+
+test_that("sparkJars sparkPackages as comma-separated strings", {
+  skip_on_cran()
+
+  expect_warning(processSparkJars(" a, b "))
+  jars <- suppressWarnings(processSparkJars(" a, b "))
+  expect_equal(lapply(jars, basename), list("a", "b"))
+
+  jars <- suppressWarnings(processSparkJars(" abc ,, def "))
+  expect_equal(lapply(jars, basename), list("abc", "def"))
+
+  jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b")))
+  expect_equal(lapply(jars, basename), list("abc", "def", "xyz", "a", "b"))
+
+  p <- processSparkPackages(c("ghi", "lmn"))
+  expect_equal(p, c("ghi", "lmn"))
+
+  # check normalizePath
+  f <- dir()[[1]]
+  expect_warning(processSparkJars(f), NA)
+  expect_match(processSparkJars(f), f)
+})
+
+test_that("spark.lapply should perform simple transforms", {
+  sparkR.sparkContext(master = sparkRTestMaster)
+  doubled <- spark.lapply(1:10, function(x) { 2 * x })
+  expect_equal(doubled, as.list(2 * 1:10))
+  sparkR.session.stop()
+})
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+  skip_on_cran()
+
+  sparkR.sparkContext(master = sparkRTestMaster)
+  # Test add file.
+  path <- tempfile(pattern = "hello", fileext = ".txt")
+  filename <- basename(path)
+  words <- "Hello World!"
+  writeLines(words, path)
+  spark.addFile(path)
+  download_path <- spark.getSparkFiles(filename)
+  expect_equal(readLines(download_path), words)
+
+  # Test spark.getSparkFiles works well on executors.
+  seq <- seq(from = 1, to = 10, length.out = 5)
+  f <- function(seq) { spark.getSparkFiles(filename) }
+  results <- spark.lapply(seq, f)
+  for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
+
+  unlink(path)
+
+  # Test add directory recursively.
+  path <- paste0(tempdir(), "/", "recursive_dir")
+  dir.create(path)
+  dir_name <- basename(path)
+  path1 <- paste0(path, "/", "hello.txt")
+  file.create(path1)
+  sub_path <- paste0(path, "/", "sub_hello")
+  dir.create(sub_path)
+  path2 <- paste0(sub_path, "/", "sub_hello.txt")
+  file.create(path2)
+  words <- "Hello World!"
+  sub_words <- "Sub Hello World!"
+  writeLines(words, path1)
+  writeLines(sub_words, path2)
+  spark.addFile(path, recursive = TRUE)
+  download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt"))
+  expect_equal(readLines(download_path1), words)
+  download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt"))
+  expect_equal(readLines(download_path2), sub_words)
+  unlink(path, recursive = TRUE)
+  sparkR.session.stop()
+})

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_includePackage.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_includePackage.R b/R/pkg/tests/fulltests/test_includePackage.R
new file mode 100644
index 0000000..d7d9eee
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_includePackage.R
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("include R packages")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
+
+# Partitioned data
+nums <- 1:2
+rdd <- parallelize(sc, nums, 2L)
+
+test_that("include inside function", {
+  skip_on_cran()
+
+  # Only run the test if plyr is installed.
+  if ("plyr" %in% rownames(installed.packages())) {
+    suppressPackageStartupMessages(library(plyr))
+    generateData <- function(x) {
+      suppressPackageStartupMessages(library(plyr))
+      attach(airquality)
+      result <- transform(Ozone, logOzone = log(Ozone))
+      result
+    }
+
+    data <- lapplyPartition(rdd, generateData)
+    actual <- collectRDD(data)
+  }
+})
+
+test_that("use include package", {
+  skip_on_cran()
+
+  # Only run the test if plyr is installed.
+  if ("plyr" %in% rownames(installed.packages())) {
+    suppressPackageStartupMessages(library(plyr))
+    generateData <- function(x) {
+      attach(airquality)
+      result <- transform(Ozone, logOzone = log(Ozone))
+      result
+    }
+
+    includePackage(sc, plyr)
+    data <- lapplyPartition(rdd, generateData)
+    actual <- collectRDD(data)
+  }
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_jvm_api.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_jvm_api.R b/R/pkg/tests/fulltests/test_jvm_api.R
new file mode 100644
index 0000000..8b3b4f7
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_jvm_api.R
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("JVM API")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("Create and call methods on object", {
+  jarr <- sparkR.newJObject("java.util.ArrayList")
+  # Add an element to the array
+  sparkR.callJMethod(jarr, "add", 1L)
+  # Check if get returns the same element
+  expect_equal(sparkR.callJMethod(jarr, "get", 0L), 1L)
+})
+
+test_that("Call static methods", {
+  # Convert a boolean to a string
+  strTrue <- sparkR.callJStatic("java.lang.String", "valueOf", TRUE)
+  expect_equal(strTrue, "true")
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_classification.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R
new file mode 100644
index 0000000..82e588d
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_classification.R
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib classification algorithms, except for tree-based algorithms")
+
+# Tests for MLlib classification algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+absoluteSparkPath <- function(x) {
+  sparkHome <- sparkR.conf("spark.home")
+  file.path(sparkHome, x)
+}
+
+test_that("spark.svmLinear", {
+  skip_on_cran()
+
+  df <- suppressWarnings(createDataFrame(iris))
+  training <- df[df$Species %in% c("versicolor", "virginica"), ]
+  model <- spark.svmLinear(training,  Species ~ ., regParam = 0.01, maxIter = 10)
+  summary <- summary(model)
+
+  # test summary coefficients return matrix type
+  expect_true(class(summary$coefficients) == "matrix")
+  expect_true(class(summary$coefficients[, 1]) == "numeric")
+
+  coefs <- summary$coefficients[, "Estimate"]
+  expected_coefs <- c(-0.06004978, -0.1563083, -0.460648, 0.2276626, 1.055085)
+  expect_true(all(abs(coefs - expected_coefs) < 0.1))
+
+  # Test prediction with string label
+  prediction <- predict(model, training)
+  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
+  expected <- c("versicolor", "versicolor", "versicolor", "virginica",  "virginica",
+                "virginica",  "virginica",  "virginica",  "virginica",  "virginica")
+  expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)
+
+  # Test model save and load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    coefs <- summary(model)$coefficients
+    coefs2 <- summary(model2)$coefficients
+    expect_equal(coefs, coefs2)
+    unlink(modelPath)
+  }
+
+  # Test prediction with numeric label
+  label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
+  feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
+  data <- as.data.frame(cbind(label, feature))
+  df <- createDataFrame(data)
+  model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
+  prediction <- collect(select(predict(model, df), "prediction"))
+  expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
+
+})
+
+test_that("spark.logit", {
+  # R code to reproduce the result.
+  # nolint start
+  #' library(glmnet)
+  #' iris.x = as.matrix(iris[, 1:4])
+  #' iris.y = as.factor(as.character(iris[, 5]))
+  #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
+  #' coef(logit)
+  #
+  # $setosa
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  #               1.0981324
+  # Sepal.Length -0.2909860
+  # Sepal.Width   0.5510907
+  # Petal.Length -0.1915217
+  # Petal.Width  -0.4211946
+  #
+  # $versicolor
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  #               1.520061e+00
+  # Sepal.Length  2.524501e-02
+  # Sepal.Width  -5.310313e-01
+  # Petal.Length  3.656543e-02
+  # Petal.Width  -3.144464e-05
+  #
+  # $virginica
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  #              -2.61819385
+  # Sepal.Length  0.26574097
+  # Sepal.Width  -0.02005932
+  # Petal.Length  0.15495629
+  # Petal.Width   0.42122607
+  # nolint end
+
+  # Test multinomial logistic regression againt three classes
+  df <- suppressWarnings(createDataFrame(iris))
+  model <- spark.logit(df, Species ~ ., regParam = 0.5)
+  summary <- summary(model)
+
+  # test summary coefficients return matrix type
+  expect_true(class(summary$coefficients) == "matrix")
+  expect_true(class(summary$coefficients[, 1]) == "numeric")
+
+  versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
+  virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42)
+  setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42)
+  versicolorCoefs <- summary$coefficients[, "versicolor"]
+  virginicaCoefs <- summary$coefficients[, "virginica"]
+  setosaCoefs <- summary$coefficients[, "setosa"]
+  expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
+  expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
+  expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1))
+
+  # Test model save and load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    coefs <- summary(model)$coefficients
+    coefs2 <- summary(model2)$coefficients
+    expect_equal(coefs, coefs2)
+    unlink(modelPath)
+  }
+
+  # R code to reproduce the result.
+  # nolint start
+  #' library(glmnet)
+  #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ]
+  #' iris.x = as.matrix(iris2[, 1:4])
+  #' iris.y = as.factor(as.character(iris2[, 5]))
+  #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5)
+  #' coef(logit)
+  #
+  # $versicolor
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  #               3.93844796
+  # Sepal.Length -0.13538675
+  # Sepal.Width  -0.02386443
+  # Petal.Length -0.35076451
+  # Petal.Width  -0.77971954
+  #
+  # $virginica
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  #              -3.93844796
+  # Sepal.Length  0.13538675
+  # Sepal.Width   0.02386443
+  # Petal.Length  0.35076451
+  # Petal.Width   0.77971954
+  #
+  #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5)
+  #' coef(logit)
+  #
+  # 5 x 1 sparse Matrix of class "dgCMatrix"
+  # s0
+  # (Intercept)  -6.0824412
+  # Sepal.Length  0.2458260
+  # Sepal.Width   0.1642093
+  # Petal.Length  0.4759487
+  # Petal.Width   1.0383948
+  #
+  # nolint end
+
+  # Test multinomial logistic regression againt two classes
+  df <- suppressWarnings(createDataFrame(iris))
+  training <- df[df$Species %in% c("versicolor", "virginica"), ]
+  model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial")
+  summary <- summary(model)
+  versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78)
+  virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78)
+  versicolorCoefs <- summary$coefficients[, "versicolor"]
+  virginicaCoefs <- summary$coefficients[, "virginica"]
+  expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1))
+  expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1))
+
+  # Test binomial logistic regression againt two classes
+  model <- spark.logit(training, Species ~ ., regParam = 0.5)
+  summary <- summary(model)
+  coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04)
+  coefs <- summary$coefficients[, "Estimate"]
+  expect_true(all(abs(coefsR - coefs) < 0.1))
+
+  # Test prediction with string label
+  prediction <- predict(model, training)
+  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
+  expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor",
+                "versicolor", "versicolor", "versicolor", "versicolor", "versicolor")
+  expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
+
+  # Test prediction with numeric label
+  label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
+  feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
+  data <- as.data.frame(cbind(label, feature))
+  df <- createDataFrame(data)
+  model <- spark.logit(df, label ~ feature)
+  prediction <- collect(select(predict(model, df), "prediction"))
+  expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
+
+  # Test prediction with weightCol
+  weight <- c(2.0, 2.0, 2.0, 1.0, 1.0)
+  data2 <- as.data.frame(cbind(label, feature, weight))
+  df2 <- createDataFrame(data2)
+  model2 <- spark.logit(df2, label ~ feature, weightCol = "weight")
+  prediction2 <- collect(select(predict(model2, df2), "prediction"))
+  expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
+})
+
+test_that("spark.mlp", {
+  skip_on_cran()
+
+  df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
+                source = "libsvm")
+  model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
+                     solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
+
+  # Test summary method
+  summary <- summary(model)
+  expect_equal(summary$numOfInputs, 4)
+  expect_equal(summary$numOfOutputs, 3)
+  expect_equal(summary$layers, c(4, 5, 4, 3))
+  expect_equal(length(summary$weights), 64)
+  expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
+               tolerance = 1e-6)
+
+  # Test predict method
+  mlpTestDF <- df
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    summary2 <- summary(model2)
+
+    expect_equal(summary2$numOfInputs, 4)
+    expect_equal(summary2$numOfOutputs, 3)
+    expect_equal(summary2$layers, c(4, 5, 4, 3))
+    expect_equal(length(summary2$weights), 64)
+
+    unlink(modelPath)
+  }
+
+  # Test default parameter
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+  # Test illegal parameter
+  expect_error(spark.mlp(df, label ~ features, layers = NULL),
+               "layers must be a integer vector with length > 1.")
+  expect_error(spark.mlp(df, label ~ features, layers = c()),
+               "layers must be a integer vector with length > 1.")
+  expect_error(spark.mlp(df, label ~ features, layers = c(3)),
+               "layers must be a integer vector with length > 1.")
+
+  # Test random seed
+  # default seed
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+  # seed equals 10
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+  # test initialWeights
+  model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
+    c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
+
+  # Test formula works well
+  df <- suppressWarnings(createDataFrame(iris))
+  model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
+                     layers = c(4, 3))
+  summary <- summary(model)
+  expect_equal(summary$numOfInputs, 4)
+  expect_equal(summary$numOfOutputs, 3)
+  expect_equal(summary$layers, c(4, 3))
+  expect_equal(length(summary$weights), 15)
+})
+
+test_that("spark.naiveBayes", {
+  # R code to reproduce the result.
+  # We do not support instance weights yet. So we ignore the frequencies.
+  #
+  #' library(e1071)
+  #' t <- as.data.frame(Titanic)
+  #' t1 <- t[t$Freq > 0, -5]
+  #' m <- naiveBayes(Survived ~ ., data = t1)
+  #' m
+  #' predict(m, t1)
+  #
+  # -- output of 'm'
+  #
+  # A-priori probabilities:
+  # Y
+  #        No       Yes
+  # 0.4166667 0.5833333
+  #
+  # Conditional probabilities:
+  #      Class
+  # Y           1st       2nd       3rd      Crew
+  #   No  0.2000000 0.2000000 0.4000000 0.2000000
+  #   Yes 0.2857143 0.2857143 0.2857143 0.1428571
+  #
+  #      Sex
+  # Y     Male Female
+  #   No   0.5    0.5
+  #   Yes  0.5    0.5
+  #
+  #      Age
+  # Y         Child     Adult
+  #   No  0.2000000 0.8000000
+  #   Yes 0.4285714 0.5714286
+  #
+  # -- output of 'predict(m, t1)'
+  #
+  # Yes Yes Yes Yes No  No  Yes Yes No  No  Yes Yes Yes Yes Yes Yes Yes Yes No  No  Yes Yes No  No
+  #
+
+  t <- as.data.frame(Titanic)
+  t1 <- t[t$Freq > 0, -5]
+  df <- suppressWarnings(createDataFrame(t1))
+  m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0)
+  s <- summary(m)
+  expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6)
+  expect_equal(sum(s$apriori), 1)
+  expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6)
+  p <- collect(select(predict(m, df), "prediction"))
+  expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No",
+                               "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No",
+                               "Yes", "Yes", "No", "No"))
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp")
+    write.ml(m, modelPath)
+    expect_error(write.ml(m, modelPath))
+    write.ml(m, modelPath, overwrite = TRUE)
+    m2 <- read.ml(modelPath)
+    s2 <- summary(m2)
+    expect_equal(s$apriori, s2$apriori)
+    expect_equal(s$tables, s2$tables)
+
+    unlink(modelPath)
+  }
+
+  # Test e1071::naiveBayes
+  if (requireNamespace("e1071", quietly = TRUE)) {
+    expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
+    expect_equal(as.character(predict(m, t1[1, ])), "Yes")
+  }
+
+  # Test numeric response variable
+  t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
+  t2 <- t1[-4]
+  df <- suppressWarnings(createDataFrame(t2))
+  m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
+  s <- summary(m)
+  expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
+  expect_equal(sum(s$apriori), 1)
+  expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_clustering.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_mllib_clustering.R b/R/pkg/tests/fulltests/test_mllib_clustering.R
new file mode 100644
index 0000000..e827e96
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_clustering.R
@@ -0,0 +1,328 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib clustering algorithms")
+
+# Tests for MLlib clustering algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+absoluteSparkPath <- function(x) {
+  sparkHome <- sparkR.conf("spark.home")
+  file.path(sparkHome, x)
+}
+
+test_that("spark.bisectingKmeans", {
+  skip_on_cran()
+
+  newIris <- iris
+  newIris$Species <- NULL
+  training <- suppressWarnings(createDataFrame(newIris))
+
+  take(training, 1)
+
+  model <- spark.bisectingKmeans(data = training, ~ .)
+  sample <- take(select(predict(model, training), "prediction"), 1)
+  expect_equal(typeof(sample$prediction), "integer")
+  expect_equal(sample$prediction, 1)
+
+  # Test fitted works on Bisecting KMeans
+  fitted.model <- fitted(model)
+  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
+               c(0, 1, 2, 3))
+
+  # Test summary works on KMeans
+  summary.model <- summary(model)
+  cluster <- summary.model$cluster
+  k <- summary.model$k
+  expect_equal(k, 4)
+  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
+               c(0, 1, 2, 3))
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    summary2 <- summary(model2)
+    expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+    expect_equal(summary.model$coefficients, summary2$coefficients)
+    expect_true(!summary.model$is.loaded)
+    expect_true(summary2$is.loaded)
+
+    unlink(modelPath)
+  }
+})
+
+test_that("spark.gaussianMixture", {
+  # R code to reproduce the result.
+  # nolint start
+  #' library(mvtnorm)
+  #' set.seed(1)
+  #' a <- rmvnorm(7, c(0, 0))
+  #' b <- rmvnorm(8, c(10, 10))
+  #' data <- rbind(a, b)
+  #' model <- mvnormalmixEM(data, k = 2)
+  #' model$lambda
+  #
+  #  [1] 0.4666667 0.5333333
+  #
+  #' model$mu
+  #
+  #  [1] 0.11731091 -0.06192351
+  #  [1] 10.363673  9.897081
+  #
+  #' model$sigma
+  #
+  #  [[1]]
+  #             [,1]       [,2]
+  #  [1,] 0.62049934 0.06880802
+  #  [2,] 0.06880802 1.27431874
+  #
+  #  [[2]]
+  #            [,1]     [,2]
+  #  [1,] 0.2961543 0.160783
+  #  [2,] 0.1607830 1.008878
+  #
+  #' model$loglik
+  #
+  #  [1] -46.89499
+  # nolint end
+  data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808),
+               list(0.3295078, -0.8204684), list(0.4874291, 0.7383247),
+               list(0.5757814, -0.3053884), list(1.5117812, 0.3898432),
+               list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664),
+               list(9.9838097, 10.9438362), list(10.8212212, 10.5939013),
+               list(10.9189774, 10.7821363), list(10.0745650, 8.0106483),
+               list(10.6198257, 9.9438713), list(9.8442045, 8.5292476),
+               list(9.5218499, 10.4179416))
+  df <- createDataFrame(data, c("x1", "x2"))
+  model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2)
+  stats <- summary(model)
+  rLambda <- c(0.4666667, 0.5333333)
+  rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081)
+  rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874,
+              0.2961543, 0.160783, 0.1607830, 1.008878)
+  rLoglik <- -46.89499
+  expect_equal(stats$lambda, rLambda, tolerance = 1e-3)
+  expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3)
+  expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3)
+  expect_equal(unlist(stats$loglik), rLoglik, tolerance = 1e-3)
+  p <- collect(select(predict(model, df), "prediction"))
+  expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1))
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    stats2 <- summary(model2)
+    expect_equal(stats$lambda, stats2$lambda)
+    expect_equal(unlist(stats$mu), unlist(stats2$mu))
+    expect_equal(unlist(stats$sigma), unlist(stats2$sigma))
+    expect_equal(unlist(stats$loglik), unlist(stats2$loglik))
+
+    unlink(modelPath)
+  }
+})
+
+test_that("spark.kmeans", {
+  newIris <- iris
+  newIris$Species <- NULL
+  training <- suppressWarnings(createDataFrame(newIris))
+
+  take(training, 1)
+
+  model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random")
+  sample <- take(select(predict(model, training), "prediction"), 1)
+  expect_equal(typeof(sample$prediction), "integer")
+  expect_equal(sample$prediction, 1)
+
+  # Test stats::kmeans is working
+  statsModel <- kmeans(x = newIris, centers = 2)
+  expect_equal(sort(unique(statsModel$cluster)), c(1, 2))
+
+  # Test fitted works on KMeans
+  fitted.model <- fitted(model)
+  expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1))
+
+  # Test summary works on KMeans
+  summary.model <- summary(model)
+  cluster <- summary.model$cluster
+  k <- summary.model$k
+  expect_equal(k, 2)
+  expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))
+
+  # test summary coefficients return matrix type
+  expect_true(class(summary.model$coefficients) == "matrix")
+  expect_true(class(summary.model$coefficients[1, ]) == "numeric")
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    summary2 <- summary(model2)
+    expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
+    expect_equal(summary.model$coefficients, summary2$coefficients)
+    expect_true(!summary.model$is.loaded)
+    expect_true(summary2$is.loaded)
+
+    unlink(modelPath)
+  }
+
+  # Test Kmeans on dataset that is sensitive to seed value
+  col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  cols <- as.data.frame(cbind(col1, col2, col3))
+  df <- createDataFrame(cols)
+
+  model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+                         initMode = "random", seed = 1, tol = 1E-5)
+  model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+                         initMode = "random", seed = 22222, tol = 1E-5)
+
+  summary.model1 <- summary(model1)
+  summary.model2 <- summary(model2)
+  cluster1 <- summary.model1$cluster
+  cluster2 <- summary.model2$cluster
+  clusterSize1 <- summary.model1$clusterSize
+  clusterSize2 <- summary.model2$clusterSize
+
+  # The predicted clusters are different
+  expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
+             c(0, 1, 2, 3))
+  expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
+             c(0, 1, 2))
+  expect_equal(clusterSize1, 4)
+  expect_equal(clusterSize2, 3)
+})
+
+test_that("spark.lda with libsvm", {
+  text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm")
+  model <- spark.lda(text, optimizer = "em")
+
+  stats <- summary(model, 10)
+  isDistributed <- stats$isDistributed
+  logLikelihood <- stats$logLikelihood
+  logPerplexity <- stats$logPerplexity
+  vocabSize <- stats$vocabSize
+  topics <- stats$topicTopTerms
+  weights <- stats$topicTopTermsWeights
+  vocabulary <- stats$vocabulary
+  trainingLogLikelihood <- stats$trainingLogLikelihood
+  logPrior <- stats$logPrior
+
+  expect_true(isDistributed)
+  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+  expect_equal(vocabSize, 11)
+  expect_true(is.null(vocabulary))
+  expect_true(trainingLogLikelihood <= 0 & !is.na(trainingLogLikelihood))
+  expect_true(logPrior <= 0 & !is.na(logPrior))
+
+  # Test model save/load
+  if (not_cran_or_windows_with_hadoop()) {
+    modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp")
+    write.ml(model, modelPath)
+    expect_error(write.ml(model, modelPath))
+    write.ml(model, modelPath, overwrite = TRUE)
+    model2 <- read.ml(modelPath)
+    stats2 <- summary(model2)
+
+    expect_true(stats2$isDistributed)
+    expect_equal(logLikelihood, stats2$logLikelihood)
+    expect_equal(logPerplexity, stats2$logPerplexity)
+    expect_equal(vocabSize, stats2$vocabSize)
+    expect_equal(vocabulary, stats2$vocabulary)
+    expect_equal(trainingLogLikelihood, stats2$trainingLogLikelihood)
+    expect_equal(logPrior, stats2$logPrior)
+
+    unlink(modelPath)
+  }
+})
+
+test_that("spark.lda with text input", {
+  skip_on_cran()
+
+  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
+  model <- spark.lda(text, optimizer = "online", features = "value")
+
+  stats <- summary(model)
+  isDistributed <- stats$isDistributed
+  logLikelihood <- stats$logLikelihood
+  logPerplexity <- stats$logPerplexity
+  vocabSize <- stats$vocabSize
+  topics <- stats$topicTopTerms
+  weights <- stats$topicTopTermsWeights
+  vocabulary <- stats$vocabulary
+  trainingLogLikelihood <- stats$trainingLogLikelihood
+  logPrior <- stats$logPrior
+
+  expect_false(isDistributed)
+  expect_true(logLikelihood <= 0 & is.finite(logLikelihood))
+  expect_true(logPerplexity >= 0 & is.finite(logPerplexity))
+  expect_equal(vocabSize, 10)
+  expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")))
+  expect_true(is.na(trainingLogLikelihood))
+  expect_true(is.na(logPrior))
+
+  # Test model save/load
+  modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp")
+  write.ml(model, modelPath)
+  expect_error(write.ml(model, modelPath))
+  write.ml(model, modelPath, overwrite = TRUE)
+  model2 <- read.ml(modelPath)
+  stats2 <- summary(model2)
+
+  expect_false(stats2$isDistributed)
+  expect_equal(logLikelihood, stats2$logLikelihood)
+  expect_equal(logPerplexity, stats2$logPerplexity)
+  expect_equal(vocabSize, stats2$vocabSize)
+  expect_true(all.equal(vocabulary, stats2$vocabulary))
+  expect_true(is.na(stats2$trainingLogLikelihood))
+  expect_true(is.na(stats2$logPrior))
+
+  unlink(modelPath)
+})
+
+test_that("spark.posterior and spark.perplexity", {
+  skip_on_cran()
+
+  text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt"))
+  model <- spark.lda(text, features = "value", k = 3)
+
+  # Assert perplexities are equal
+  stats <- summary(model)
+  logPerplexity <- spark.perplexity(model, text)
+  expect_equal(logPerplexity, stats$logPerplexity)
+
+  # Assert the sum of every topic distribution is equal to 1
+  posterior <- spark.posterior(model, text)
+  local.posterior <- collect(posterior)$topicDistribution
+  expect_equal(length(local.posterior), sum(unlist(local.posterior)))
+})
+
+sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org