You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/08/31 08:44:58 UTC
[spark] branch branch-3.0 updated: [SPARK-32747][R][TESTS]
Deduplicate configuration set/unset in test_sparkSQL_arrow.R
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 985c593 [SPARK-32747][R][TESTS] Deduplicate configuration set/unset in test_sparkSQL_arrow.R
985c593 is described below
commit 985c593944fcf625bb020fe94189e27aeaab99bd
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Mon Aug 31 17:39:12 2020 +0900
[SPARK-32747][R][TESTS] Deduplicate configuration set/unset in test_sparkSQL_arrow.R
### What changes were proposed in this pull request?
This PR proposes to deduplicate configuration set/unset in `test_sparkSQL_arrow.R`.
Setting `spark.sql.execution.arrow.sparkr.enabled` can be globally done instead of doing it in each test case.
### Why are the changes needed?
To duduplicate the codes.
### Does this PR introduce _any_ user-facing change?
No, dev-only
### How was this patch tested?
Manually ran the tests.
Closes #29592 from HyukjinKwon/SPARK-32747.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 2491cf1ae1085832f21545ae563a789c34bdd098)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
R/pkg/tests/fulltests/test_sparkSQL_arrow.R | 201 +++++++++-------------------
1 file changed, 60 insertions(+), 141 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
index 16d9376..0674348 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
@@ -19,7 +19,10 @@ library(testthat)
context("SparkSQL Arrow optimization")
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+sparkSession <- sparkR.session(
+ master = sparkRTestMaster,
+ enableHiveSupport = FALSE,
+ sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
test_that("createDataFrame/collect Arrow optimization", {
skip_if_not_installed("arrow")
@@ -35,29 +38,13 @@ test_that("createDataFrame/collect Arrow optimization", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(mtcars)), expected)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ expect_equal(collect(createDataFrame(mtcars)), expected)
})
test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
skip_if_not_installed("arrow")
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
- collect(createDataFrame(mtcars, numPartitions = 1)))
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
+ collect(createDataFrame(mtcars, numPartitions = 1)))
})
test_that("createDataFrame/collect Arrow optimization - type specification", {
@@ -81,13 +68,7 @@ test_that("createDataFrame/collect Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(rdf)), expected)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ expect_equal(collect(createDataFrame(rdf)), expected)
})
test_that("dapply() Arrow optimization", {
@@ -100,34 +81,28 @@ test_that("dapply() Arrow optimization", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- dapply(df,
- function(rdf) {
- stopifnot(is.data.frame(rdf))
- rdf
- },
- schema(df))
- expected <- collect(ret)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- dapply(df,
function(rdf) {
stopifnot(is.data.frame(rdf))
- # mtcars' hp is more then 50.
- stopifnot(all(rdf$hp > 50))
rdf
},
schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- expect_equal(count(ret), nrow(mtcars))
+ expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
+
+ ret <- dapply(df,
+ function(rdf) {
+ stopifnot(is.data.frame(rdf))
+ # mtcars' hp is more then 50.
+ stopifnot(all(rdf$hp > 50))
+ rdf
+ },
+ schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ expect_equal(count(ret), nrow(mtcars))
})
test_that("dapply() Arrow optimization - type specification", {
@@ -154,15 +129,9 @@ test_that("dapply() Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- dapply(df, function(rdf) { rdf }, schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ ret <- dapply(df, function(rdf) { rdf }, schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
})
test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
@@ -170,18 +139,8 @@ test_that("dapply() Arrow optimization - type specification (date and timestamp)
rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
b = as.POSIXct("1990-02-24 12:34:56"))))
df <- createDataFrame(rdf)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- dapply(df, function(rdf) { rdf }, schema(df))
- expect_equal(collect(ret), rdf)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ ret <- dapply(df, function(rdf) { rdf }, schema(df))
+ expect_equal(collect(ret), rdf)
})
test_that("gapply() Arrow optimization", {
@@ -209,28 +168,22 @@ test_that("gapply() Arrow optimization", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "gear",
- function(key, grouped) {
- if (length(key) > 0) {
- stopifnot(is.numeric(key[[1]]))
- }
- stopifnot(is.data.frame(grouped))
- stopifnot(length(colnames(grouped)) == 11)
- # mtcars' hp is more then 50.
- stopifnot(all(grouped$hp > 50))
- grouped
- },
- schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- expect_equal(count(ret), nrow(mtcars))
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ ret <- gapply(df,
+ "gear",
+ function(key, grouped) {
+ if (length(key) > 0) {
+ stopifnot(is.numeric(key[[1]]))
+ }
+ stopifnot(is.data.frame(grouped))
+ stopifnot(length(colnames(grouped)) == 11)
+ # mtcars' hp is more then 50.
+ stopifnot(all(grouped$hp > 50))
+ grouped
+ },
+ schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ expect_equal(count(ret), nrow(mtcars))
})
test_that("gapply() Arrow optimization - type specification", {
@@ -250,26 +203,19 @@ test_that("gapply() Arrow optimization - type specification", {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "false")
tryCatch({
ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
+ "a",
+ function(key, grouped) { grouped }, schema(df))
expected <- collect(ret)
},
finally = {
callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
})
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ ret <- gapply(df,
+ "a",
+ function(key, grouped) { grouped }, schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
})
test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
@@ -277,57 +223,30 @@ test_that("gapply() Arrow optimization - type specification (date and timestamp)
rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
b = as.POSIXct("1990-02-24 12:34:56"))))
df <- createDataFrame(rdf)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
- expect_equal(collect(ret), rdf)
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ ret <- gapply(df,
+ "a",
+ function(key, grouped) { grouped }, schema(df))
+ expect_equal(collect(ret), rdf)
})
test_that("Arrow optimization - unsupported types", {
skip_if_not_installed("arrow")
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
- expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
- expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
- expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
- expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
- "not support nested struct type")
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
+ expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
+ expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
+ expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
+ expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
+ expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
+ "not support nested struct type")
})
test_that("SPARK-32478: gapply() Arrow optimization - error message for schema mismatch", {
skip_if_not_installed("arrow")
df <- createDataFrame(list(list(a = 1L, b = "a")))
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
- tryCatch({
- expect_error(
+ expect_error(
count(gapply(df, "a", function(key, group) { group }, structType("a int, b int"))),
"expected IntegerType, IntegerType, got IntegerType, StringType")
- },
- finally = {
- callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
- })
})
sparkR.session.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org