You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/03/11 01:23:40 UTC
[spark] branch master updated: [SPARK-26920][R] Deduplicate type
checking across Arrow optimization in SparkR
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 93ea353 [SPARK-26920][R] Deduplicate type checking across Arrow optimization in SparkR
93ea353 is described below
commit 93ea353cae0ccc187028f79024b239893ebaeb96
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Mar 11 10:23:24 2019 +0900
[SPARK-26920][R] Deduplicate type checking across Arrow optimization in SparkR
## What changes were proposed in this pull request?
This PR proposes two things.
1.. Deduplicates the type checking logic. While I am here, I checked each type. Currently, binary type, float type, nested struct type and array type are not supported.
**For map and nested struct types:**
it's expected to be unsupported per Spark's arrow optimization.
```
Exception in thread "serve-Arrow" java.lang.UnsupportedOperationException: Unsupported data type: map<string,double>
...
```
```
Exception in thread "serve-Arrow" java.lang.UnsupportedOperationException: Unsupported data type: struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
...
```
Please track the trace below to double check.
```
at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUtils.scala:56)
at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUtils.scala:92)
at org.apache.spark.sql.execution.arrow.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:116)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowUtils.scala:115)
at org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter.<init>(ArrowConverters.scala:50)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToR$2(Dataset.scala:3215)
at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToR$2$adapted(Dataset.scala:3212)
```
**For float and binary types:**
They cause corrupt values in some cases. It needs to be investigated separately.
**For array type:**
```
Error in Table__to_dataframe(x, use_threads = use_threads) :
cannot handle Array of type list
```
Seems to be Arrow's R library limitation. It needs to be investigated separately as well.
2.. While I am touching the type specification codes across Arrow optimization, I move the Arrow optimization related tests into a separate filed called `test_arrow.R`.
## How was this patch tested?
Tests were added and also manually tested.
Closes #23969 from HyukjinKwon/SPARK-26920.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
R/pkg/R/DataFrame.R | 32 +--
R/pkg/R/SQLContext.R | 82 ++++----
R/pkg/R/group.R | 17 +-
R/pkg/R/types.R | 35 ++++
R/pkg/tests/fulltests/test_sparkSQL.R | 287 -------------------------
R/pkg/tests/fulltests/test_sparkSQL_arrow.R | 315 ++++++++++++++++++++++++++++
6 files changed, 392 insertions(+), 376 deletions(-)
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5908a13..9ad64a7 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1182,24 +1182,7 @@ setMethod("collect",
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
if (arrowEnabled) {
useArrow <- tryCatch({
- requireNamespace1 <- requireNamespace
- if (!requireNamespace1("arrow", quietly = TRUE)) {
- stop("'arrow' package should be installed.")
- }
- # Currenty Arrow optimization does not support raw for now.
- # Also, it does not support explicit float type set by users.
- if (inherits(schema(x), "structType")) {
- if (any(sapply(schema(x)$fields(),
- function(x) x$dataType.toString() == "FloatType"))) {
- stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
- "DataFrame does not support FloatType yet."))
- }
- if (any(sapply(schema(x)$fields(),
- function(x) x$dataType.toString() == "BinaryType"))) {
- stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
- "DataFrame does not support BinaryType yet."))
- }
- }
+ checkSchemaInArrow(schema(x))
TRUE
}, error = function(e) {
warning(paste0("The conversion from Spark DataFrame to R DataFrame was attempted ",
@@ -1495,19 +1478,8 @@ dapplyInternal <- function(x, func, schema) {
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
if (arrowEnabled) {
- requireNamespace1 <- requireNamespace
- if (!requireNamespace1("arrow", quietly = TRUE)) {
- stop("'arrow' package should be installed.")
- }
- # Currenty Arrow optimization does not support raw for now.
- # Also, it does not support explicit float type set by users.
if (inherits(schema, "structType")) {
- if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
- stop("Arrow optimization with dapply do not support FloatType yet.")
- }
- if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
- stop("Arrow optimization with dapply do not support BinaryType yet.")
- }
+ checkSchemaInArrow(schema)
} else if (is.null(schema)) {
stop(paste0("Arrow optimization does not support 'dapplyCollect' yet. Please disable ",
"Arrow optimization or use 'collect' and 'dapply' APIs instead."))
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 5686912..5928661 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -197,17 +197,40 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
}
}
-checkTypeRequirementForArrow <- function(dataHead, schema) {
- # Currenty Arrow optimization does not support raw for now.
- # Also, it does not support explicit float type set by users. It leads to
- # incorrect conversion. We will fall back to the path without Arrow optimization.
- if (any(sapply(dataHead, is.raw))) {
- stop("Arrow optimization with R DataFrame does not support raw type yet.")
- }
- if (inherits(schema, "structType")) {
- if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
- stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
+getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
+ if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
+ if (is.null(firstRow)) {
+ stopifnot(!is.null(rdd))
+ firstRow <- firstRDD(rdd)
}
+ names <- if (is.null(schema)) {
+ names(firstRow)
+ } else {
+ as.list(schema)
+ }
+ if (is.null(names)) {
+ names <- lapply(1:length(firstRow), function(x) {
+ paste0("_", as.character(x))
+ })
+ }
+
+ # SPAKR-SQL does not support '.' in column name, so replace it with '_'
+ # TODO(davies): remove this once SPARK-2775 is fixed
+ names <- lapply(names, function(n) {
+ nn <- gsub("[.]", "_", n)
+ if (nn != n) {
+ warning(paste("Use", nn, "instead of", n, "as column name"))
+ }
+ nn
+ })
+
+ types <- lapply(firstRow, infer_type)
+ fields <- lapply(1:length(firstRow), function(i) {
+ structField(names[[i]], types[[i]], TRUE)
+ })
+ schema <- do.call(structType, fields)
+ } else {
+ schema
}
}
@@ -260,8 +283,9 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
if (arrowEnabled) {
useArrow <- tryCatch({
stopifnot(length(data) > 0)
- dataHead <- head(data, 1)
- checkTypeRequirementForArrow(data, schema)
+ firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]]
+ schema <- getSchema(schema, firstRow = firstRow)
+ checkSchemaInArrow(schema)
fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
tryCatch({
writeToFileInArrow(fileName, data, numPartitions)
@@ -274,8 +298,6 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
# File might not be created.
suppressWarnings(file.remove(fileName))
})
-
- firstRow <- do.call(mapply, append(args, dataHead))[[1]]
TRUE
},
error = function(e) {
@@ -318,37 +340,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
stop(paste("unexpected type:", class(data)))
}
- if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
- if (is.null(firstRow)) {
- firstRow <- firstRDD(rdd)
- }
- names <- if (is.null(schema)) {
- names(firstRow)
- } else {
- as.list(schema)
- }
- if (is.null(names)) {
- names <- lapply(1:length(firstRow), function(x) {
- paste("_", as.character(x), sep = "")
- })
- }
-
- # SPAKR-SQL does not support '.' in column name, so replace it with '_'
- # TODO(davies): remove this once SPARK-2775 is fixed
- names <- lapply(names, function(n) {
- nn <- gsub("[.]", "_", n)
- if (nn != n) {
- warning(paste("Use", nn, "instead of", n, " as column name"))
- }
- nn
- })
-
- types <- lapply(firstRow, infer_type)
- fields <- lapply(1:length(firstRow), function(i) {
- structField(names[[i]], types[[i]], TRUE)
- })
- schema <- do.call(structType, fields)
- }
+ schema <- getSchema(schema, firstRow, rdd)
stopifnot(class(schema) == "structType")
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 32592f9..7b3913c 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -231,22 +231,11 @@ gapplyInternal <- function(x, func, schema) {
}
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
if (arrowEnabled) {
- requireNamespace1 <- requireNamespace
- if (!requireNamespace1("arrow", quietly = TRUE)) {
- stop("'arrow' package should be installed.")
- }
- # Currenty Arrow optimization does not support raw for now.
- # Also, it does not support explicit float type set by users.
if (inherits(schema, "structType")) {
- if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
- stop("Arrow optimization with gapply do not support FloatType yet.")
- }
- if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
- stop("Arrow optimization with gapply do not support BinaryType yet.")
- }
+ checkSchemaInArrow(schema)
} else if (is.null(schema)) {
- stop(paste0("Arrow optimization does not support gapplyCollect yet. Please use ",
- "'collect' and 'gapply' APIs instead."))
+ stop(paste0("Arrow optimization does not support 'gapplyCollect' yet. Please disable ",
+ "Arrow optimization or use 'collect' and 'gapply' APIs instead."))
} else {
stop("'schema' should be DDL-formatted string or structType.")
}
diff --git a/R/pkg/R/types.R b/R/pkg/R/types.R
index ade0f05..55f7550 100644
--- a/R/pkg/R/types.R
+++ b/R/pkg/R/types.R
@@ -83,3 +83,38 @@ specialtypeshandle <- function(type) {
}
returntype
}
+
+# Helper function that checks supported types in Arrow.
+checkSchemaInArrow <- function(schema) {
+ stopifnot(inherits(schema, "structType"))
+
+ requireNamespace1 <- requireNamespace
+ if (!requireNamespace1("arrow", quietly = TRUE)) {
+ stop("'arrow' package should be installed.")
+ }
+
+ # Both cases below produce a corrupt value for unknown reason. It needs to be investigated.
+ if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
+ stop("Arrow optimization in R does not support float type yet.")
+ }
+ if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
+ stop("Arrow optimization in R does not support binary type yet.")
+ }
+ if (any(sapply(schema$fields(),
+ function(x) startsWith(x$dataType.toString(),
+ "ArrayType")))) {
+ stop("Arrow optimization in R does not support array type yet.")
+ }
+
+ # Arrow optimization in Spark does not yet support both cases below.
+ if (any(sapply(schema$fields(),
+ function(x) startsWith(x$dataType.toString(),
+ "StructType")))) {
+ stop("Arrow optimization in R does not support nested struct type yet.")
+ }
+ if (any(sapply(schema$fields(),
+ function(x) startsWith(x$dataType.toString(),
+ "MapType")))) {
+ stop("Arrow optimization in R does not support map type yet.")
+ }
+}
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 16d423f..c60c951 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -307,80 +307,6 @@ test_that("create DataFrame from RDD", {
unsetHiveContext()
})
-test_that("createDataFrame/collect Arrow optimization", {
- skip_if_not_installed("arrow")
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- expected <- collect(createDataFrame(mtcars))
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(mtcars)), expected)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
- skip_if_not_installed("arrow")
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
- collect(createDataFrame(mtcars, numPartitions = 1)))
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("createDataFrame/collect Arrow optimization - type specification", {
- skip_if_not_installed("arrow")
- rdf <- data.frame(list(list(a = 1,
- b = "a",
- c = TRUE,
- d = 1.1,
- e = 1L,
- f = as.Date("1990-02-24"),
- g = as.POSIXct("1990-02-24 12:34:56"))))
-
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
- conf <- callJMethod(sparkSession, "conf")
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- expected <- collect(createDataFrame(rdf))
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- expect_equal(collect(createDataFrame(rdf)), expected)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
test_that("read/write csv as DataFrame", {
if (windows_with_hadoop()) {
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
@@ -3317,105 +3243,6 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
})
-test_that("dapply() Arrow optimization", {
- skip_if_not_installed("arrow")
- df <- createDataFrame(mtcars)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- ret <- dapply(df,
- function(rdf) {
- stopifnot(class(rdf) == "data.frame")
- rdf
- },
- schema(df))
- expected <- collect(ret)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- dapply(df,
- function(rdf) {
- stopifnot(class(rdf) == "data.frame")
- # mtcars' hp is more then 50.
- stopifnot(all(rdf$hp > 50))
- rdf
- },
- schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- expect_equal(count(ret), nrow(mtcars))
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("dapply() Arrow optimization - type specification", {
- skip_if_not_installed("arrow")
- # Note that regular dapply() seems not supporting date and timestamps
- # whereas Arrow-optimized dapply() does.
- rdf <- data.frame(list(list(a = 1,
- b = "a",
- c = TRUE,
- d = 1.1,
- e = 1L)))
- # numPartitions are set to 8 intentionally to test empty partitions as well.
- df <- createDataFrame(rdf, numPartitions = 8)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- ret <- dapply(df, function(rdf) { rdf }, schema(df))
- expected <- collect(ret)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- dapply(df, function(rdf) { rdf }, schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
- skip_if_not_installed("arrow")
- rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
- b = as.POSIXct("1990-02-24 12:34:56"))))
- df <- createDataFrame(rdf)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- dapply(df, function(rdf) { rdf }, schema(df))
- expect_equal(collect(ret), rdf)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
test_that("repartition by columns on DataFrame", {
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
# partitions to reduce the number of the tasks to speed up the test. This is particularly
@@ -3645,120 +3472,6 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
})
})
-test_that("gapply() Arrow optimization", {
- skip_if_not_installed("arrow")
- df <- createDataFrame(mtcars)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- ret <- gapply(df,
- "gear",
- function(key, grouped) {
- if (length(key) > 0) {
- stopifnot(is.numeric(key[[1]]))
- }
- stopifnot(class(grouped) == "data.frame")
- grouped
- },
- schema(df))
- expected <- collect(ret)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "gear",
- function(key, grouped) {
- if (length(key) > 0) {
- stopifnot(is.numeric(key[[1]]))
- }
- stopifnot(class(grouped) == "data.frame")
- stopifnot(length(colnames(grouped)) == 11)
- # mtcars' hp is more then 50.
- stopifnot(all(grouped$hp > 50))
- grouped
- },
- schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- expect_equal(count(ret), nrow(mtcars))
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("gapply() Arrow optimization - type specification", {
- skip_if_not_installed("arrow")
- # Note that regular gapply() seems not supporting date and timestamps
- # whereas Arrow-optimized gapply() does.
- rdf <- data.frame(list(list(a = 1,
- b = "a",
- c = TRUE,
- d = 1.1,
- e = 1L)))
- df <- createDataFrame(rdf)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
- tryCatch({
- ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
- expected <- collect(ret)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
- actual <- collect(ret)
- expect_equal(actual, expected)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
-test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
- skip_if_not_installed("arrow")
- rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
- b = as.POSIXct("1990-02-24 12:34:56"))))
- df <- createDataFrame(rdf)
-
- conf <- callJMethod(sparkSession, "conf")
- arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
- tryCatch({
- ret <- gapply(df,
- "a",
- function(key, grouped) { grouped }, schema(df))
- expect_equal(collect(ret), rdf)
- },
- finally = {
- # Resetting the conf back to default value
- callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
- })
-})
-
test_that("Window functions on a DataFrame", {
df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
schema = c("key", "value"))
diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
new file mode 100644
index 0000000..25a6d3c
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("SparkSQL Arrow optimization")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("createDataFrame/collect Arrow optimization", {
+ skip_if_not_installed("arrow")
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ expected <- collect(createDataFrame(mtcars))
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ expect_equal(collect(createDataFrame(mtcars)), expected)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
+ skip_if_not_installed("arrow")
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
+ collect(createDataFrame(mtcars, numPartitions = 1)))
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("createDataFrame/collect Arrow optimization - type specification", {
+ skip_if_not_installed("arrow")
+ rdf <- data.frame(list(list(a = 1,
+ b = "a",
+ c = TRUE,
+ d = 1.1,
+ e = 1L,
+ f = as.Date("1990-02-24"),
+ g = as.POSIXct("1990-02-24 12:34:56"))))
+
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+ conf <- callJMethod(sparkSession, "conf")
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ expected <- collect(createDataFrame(rdf))
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ expect_equal(collect(createDataFrame(rdf)), expected)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("dapply() Arrow optimization", {
+ skip_if_not_installed("arrow")
+ df <- createDataFrame(mtcars)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ ret <- dapply(df,
+ function(rdf) {
+ stopifnot(class(rdf) == "data.frame")
+ rdf
+ },
+ schema(df))
+ expected <- collect(ret)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- dapply(df,
+ function(rdf) {
+ stopifnot(class(rdf) == "data.frame")
+ # mtcars' hp is more then 50.
+ stopifnot(all(rdf$hp > 50))
+ rdf
+ },
+ schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ expect_equal(count(ret), nrow(mtcars))
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("dapply() Arrow optimization - type specification", {
+ skip_if_not_installed("arrow")
+ # Note that regular dapply() seems not supporting date and timestamps
+ # whereas Arrow-optimized dapply() does.
+ rdf <- data.frame(list(list(a = 1,
+ b = "a",
+ c = TRUE,
+ d = 1.1,
+ e = 1L)))
+ # numPartitions are set to 8 intentionally to test empty partitions as well.
+ df <- createDataFrame(rdf, numPartitions = 8)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ ret <- dapply(df, function(rdf) { rdf }, schema(df))
+ expected <- collect(ret)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- dapply(df, function(rdf) { rdf }, schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
+ skip_if_not_installed("arrow")
+ rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
+ b = as.POSIXct("1990-02-24 12:34:56"))))
+ df <- createDataFrame(rdf)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- dapply(df, function(rdf) { rdf }, schema(df))
+ expect_equal(collect(ret), rdf)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("gapply() Arrow optimization", {
+ skip_if_not_installed("arrow")
+ df <- createDataFrame(mtcars)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ ret <- gapply(df,
+ "gear",
+ function(key, grouped) {
+ if (length(key) > 0) {
+ stopifnot(is.numeric(key[[1]]))
+ }
+ stopifnot(class(grouped) == "data.frame")
+ grouped
+ },
+ schema(df))
+ expected <- collect(ret)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- gapply(df,
+ "gear",
+ function(key, grouped) {
+ if (length(key) > 0) {
+ stopifnot(is.numeric(key[[1]]))
+ }
+ stopifnot(class(grouped) == "data.frame")
+ stopifnot(length(colnames(grouped)) == 11)
+ # mtcars' hp is more then 50.
+ stopifnot(all(grouped$hp > 50))
+ grouped
+ },
+ schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ expect_equal(count(ret), nrow(mtcars))
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("gapply() Arrow optimization - type specification", {
+ skip_if_not_installed("arrow")
+ # Note that regular gapply() seems not supporting date and timestamps
+ # whereas Arrow-optimized gapply() does.
+ rdf <- data.frame(list(list(a = 1,
+ b = "a",
+ c = TRUE,
+ d = 1.1,
+ e = 1L)))
+ df <- createDataFrame(rdf)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+ tryCatch({
+ ret <- gapply(df,
+ "a",
+ function(key, grouped) { grouped }, schema(df))
+ expected <- collect(ret)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- gapply(df,
+ "a",
+ function(key, grouped) { grouped }, schema(df))
+ actual <- collect(ret)
+ expect_equal(actual, expected)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
+ skip_if_not_installed("arrow")
+ rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
+ b = as.POSIXct("1990-02-24 12:34:56"))))
+ df <- createDataFrame(rdf)
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ ret <- gapply(df,
+ "a",
+ function(key, grouped) { grouped }, schema(df))
+ expect_equal(collect(ret), rdf)
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+test_that("Arrow optimization - unsupported types", {
+ skip_if_not_installed("arrow")
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+ tryCatch({
+ expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
+ expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
+ expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
+ expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
+ expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
+ "not support nested struct type")
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+ })
+})
+
+sparkR.session.stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org