You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/03/11 01:23:40 UTC

[spark] branch master updated: [SPARK-26920][R] Deduplicate type checking across Arrow optimization in SparkR

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93ea353  [SPARK-26920][R] Deduplicate type checking across Arrow optimization in SparkR
93ea353 is described below

commit 93ea353cae0ccc187028f79024b239893ebaeb96
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Mar 11 10:23:24 2019 +0900

    [SPARK-26920][R] Deduplicate type checking across Arrow optimization in SparkR
    
    ## What changes were proposed in this pull request?
    
    This PR proposes two things.
    
    1.. Deduplicates the type checking logic. While I am here, I checked each type. Currently, binary type, float type, nested struct type and array type are not supported.
    
    **For map and nested struct types:**
    
     it's expected to be unsupported per Spark's arrow optimization.
    
    ```
    Exception in thread "serve-Arrow" java.lang.UnsupportedOperationException: Unsupported data type: map<string,double>
    ...
    ```
    ```
    Exception in thread "serve-Arrow" java.lang.UnsupportedOperationException: Unsupported data type:  struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
    ...
    ```
    
    Please track the trace below to double check.
    
    ```
    	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowType(ArrowUtils.scala:56)
    	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowField(ArrowUtils.scala:92)
    	at org.apache.spark.sql.execution.arrow.ArrowUtils$.$anonfun$toArrowSchema$1(ArrowUtils.scala:116)
    	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
    	at scala.collection.Iterator.foreach(Iterator.scala:941)
    	at scala.collection.Iterator.foreach$(Iterator.scala:941)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
    	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
    	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
    	at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
    	at org.apache.spark.sql.execution.arrow.ArrowUtils$.toArrowSchema(ArrowUtils.scala:115)
    	at org.apache.spark.sql.execution.arrow.ArrowBatchStreamWriter.<init>(ArrowConverters.scala:50)
    	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToR$2(Dataset.scala:3215)
    	at org.apache.spark.sql.Dataset.$anonfun$collectAsArrowToR$2$adapted(Dataset.scala:3212)
    ```
    
    **For float and binary types:**
    
    They cause corrupt values in some cases. It needs to be investigated separately.
    
    **For array type:**
    
    ```
    Error in Table__to_dataframe(x, use_threads = use_threads) :
      cannot handle Array of type list
    ```
    
    Seems to be Arrow's R library limitation. It needs to be investigated separately as well.
    
    2.. While I am touching the type specification codes across Arrow optimization, I move the Arrow optimization related tests into a separate filed called `test_arrow.R`.
    
    ## How was this patch tested?
    
    Tests were added and also manually tested.
    
    Closes #23969 from HyukjinKwon/SPARK-26920.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 R/pkg/R/DataFrame.R                         |  32 +--
 R/pkg/R/SQLContext.R                        |  82 ++++----
 R/pkg/R/group.R                             |  17 +-
 R/pkg/R/types.R                             |  35 ++++
 R/pkg/tests/fulltests/test_sparkSQL.R       | 287 -------------------------
 R/pkg/tests/fulltests/test_sparkSQL_arrow.R | 315 ++++++++++++++++++++++++++++
 6 files changed, 392 insertions(+), 376 deletions(-)

diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 5908a13..9ad64a7 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1182,24 +1182,7 @@ setMethod("collect",
             arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
             if (arrowEnabled) {
               useArrow <- tryCatch({
-                requireNamespace1 <- requireNamespace
-                if (!requireNamespace1("arrow", quietly = TRUE)) {
-                  stop("'arrow' package should be installed.")
-                }
-                # Currenty Arrow optimization does not support raw for now.
-                # Also, it does not support explicit float type set by users.
-                if (inherits(schema(x), "structType")) {
-                  if (any(sapply(schema(x)$fields(),
-                                 function(x) x$dataType.toString() == "FloatType"))) {
-                    stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
-                                "DataFrame does not support FloatType yet."))
-                  }
-                  if (any(sapply(schema(x)$fields(),
-                                 function(x) x$dataType.toString() == "BinaryType"))) {
-                    stop(paste0("Arrow optimization in the conversion from Spark DataFrame to R ",
-                                "DataFrame does not support BinaryType yet."))
-                  }
-                }
+                checkSchemaInArrow(schema(x))
                 TRUE
               }, error = function(e) {
                 warning(paste0("The conversion from Spark DataFrame to R DataFrame was attempted ",
@@ -1495,19 +1478,8 @@ dapplyInternal <- function(x, func, schema) {
 
   arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
   if (arrowEnabled) {
-    requireNamespace1 <- requireNamespace
-    if (!requireNamespace1("arrow", quietly = TRUE)) {
-      stop("'arrow' package should be installed.")
-    }
-    # Currenty Arrow optimization does not support raw for now.
-    # Also, it does not support explicit float type set by users.
     if (inherits(schema, "structType")) {
-      if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
-        stop("Arrow optimization with dapply do not support FloatType yet.")
-      }
-      if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
-        stop("Arrow optimization with dapply do not support BinaryType yet.")
-      }
+      checkSchemaInArrow(schema)
     } else if (is.null(schema)) {
       stop(paste0("Arrow optimization does not support 'dapplyCollect' yet. Please disable ",
                   "Arrow optimization or use 'collect' and 'dapply' APIs instead."))
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 5686912..5928661 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -197,17 +197,40 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
   }
 }
 
-checkTypeRequirementForArrow <- function(dataHead, schema) {
-  # Currenty Arrow optimization does not support raw for now.
-  # Also, it does not support explicit float type set by users. It leads to
-  # incorrect conversion. We will fall back to the path without Arrow optimization.
-  if (any(sapply(dataHead, is.raw))) {
-    stop("Arrow optimization with R DataFrame does not support raw type yet.")
-  }
-  if (inherits(schema, "structType")) {
-    if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
-      stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
+getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
+  if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
+    if (is.null(firstRow)) {
+      stopifnot(!is.null(rdd))
+      firstRow <- firstRDD(rdd)
     }
+    names <- if (is.null(schema)) {
+      names(firstRow)
+    } else {
+      as.list(schema)
+    }
+    if (is.null(names)) {
+      names <- lapply(1:length(firstRow), function(x) {
+        paste0("_", as.character(x))
+      })
+    }
+
+    # SPAKR-SQL does not support '.' in column name, so replace it with '_'
+    # TODO(davies): remove this once SPARK-2775 is fixed
+    names <- lapply(names, function(n) {
+      nn <- gsub("[.]", "_", n)
+      if (nn != n) {
+        warning(paste("Use", nn, "instead of", n, "as column name"))
+      }
+      nn
+    })
+
+    types <- lapply(firstRow, infer_type)
+    fields <- lapply(1:length(firstRow), function(i) {
+      structField(names[[i]], types[[i]], TRUE)
+    })
+    schema <- do.call(structType, fields)
+  } else {
+    schema
   }
 }
 
@@ -260,8 +283,9 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
     if (arrowEnabled) {
       useArrow <- tryCatch({
         stopifnot(length(data) > 0)
-        dataHead <- head(data, 1)
-        checkTypeRequirementForArrow(data, schema)
+        firstRow <- do.call(mapply, append(args, head(data, 1)))[[1]]
+        schema <- getSchema(schema, firstRow = firstRow)
+        checkSchemaInArrow(schema)
         fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
         tryCatch({
           writeToFileInArrow(fileName, data, numPartitions)
@@ -274,8 +298,6 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
           # File might not be created.
           suppressWarnings(file.remove(fileName))
         })
-
-        firstRow <- do.call(mapply, append(args, dataHead))[[1]]
         TRUE
       },
       error = function(e) {
@@ -318,37 +340,7 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
     stop(paste("unexpected type:", class(data)))
   }
 
-  if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
-    if (is.null(firstRow)) {
-      firstRow <- firstRDD(rdd)
-    }
-    names <- if (is.null(schema)) {
-      names(firstRow)
-    } else {
-      as.list(schema)
-    }
-    if (is.null(names)) {
-      names <- lapply(1:length(firstRow), function(x) {
-        paste("_", as.character(x), sep = "")
-      })
-    }
-
-    # SPAKR-SQL does not support '.' in column name, so replace it with '_'
-    # TODO(davies): remove this once SPARK-2775 is fixed
-    names <- lapply(names, function(n) {
-      nn <- gsub("[.]", "_", n)
-      if (nn != n) {
-        warning(paste("Use", nn, "instead of", n, " as column name"))
-      }
-      nn
-    })
-
-    types <- lapply(firstRow, infer_type)
-    fields <- lapply(1:length(firstRow), function(i) {
-      structField(names[[i]], types[[i]], TRUE)
-    })
-    schema <- do.call(structType, fields)
-  }
+  schema <- getSchema(schema, firstRow, rdd)
 
   stopifnot(class(schema) == "structType")
 
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 32592f9..7b3913c 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -231,22 +231,11 @@ gapplyInternal <- function(x, func, schema) {
   }
   arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
   if (arrowEnabled) {
-    requireNamespace1 <- requireNamespace
-    if (!requireNamespace1("arrow", quietly = TRUE)) {
-      stop("'arrow' package should be installed.")
-    }
-    # Currenty Arrow optimization does not support raw for now.
-    # Also, it does not support explicit float type set by users.
     if (inherits(schema, "structType")) {
-      if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
-        stop("Arrow optimization with gapply do not support FloatType yet.")
-      }
-      if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
-        stop("Arrow optimization with gapply do not support BinaryType yet.")
-      }
+      checkSchemaInArrow(schema)
     } else if (is.null(schema)) {
-      stop(paste0("Arrow optimization does not support gapplyCollect yet. Please use ",
-                  "'collect' and 'gapply' APIs instead."))
+      stop(paste0("Arrow optimization does not support 'gapplyCollect' yet. Please disable ",
+                  "Arrow optimization or use 'collect' and 'gapply' APIs instead."))
     } else {
       stop("'schema' should be DDL-formatted string or structType.")
     }
diff --git a/R/pkg/R/types.R b/R/pkg/R/types.R
index ade0f05..55f7550 100644
--- a/R/pkg/R/types.R
+++ b/R/pkg/R/types.R
@@ -83,3 +83,38 @@ specialtypeshandle <- function(type) {
   }
   returntype
 }
+
+# Helper function that checks supported types in Arrow.
+checkSchemaInArrow <- function(schema) {
+  stopifnot(inherits(schema, "structType"))
+
+  requireNamespace1 <- requireNamespace
+  if (!requireNamespace1("arrow", quietly = TRUE)) {
+    stop("'arrow' package should be installed.")
+  }
+
+  # Both cases below produce a corrupt value for unknown reason. It needs to be investigated.
+  if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
+    stop("Arrow optimization in R does not support float type yet.")
+  }
+  if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "BinaryType"))) {
+    stop("Arrow optimization in R does not support binary type yet.")
+  }
+  if (any(sapply(schema$fields(),
+                 function(x) startsWith(x$dataType.toString(),
+                 "ArrayType")))) {
+    stop("Arrow optimization in R does not support array type yet.")
+  }
+
+  # Arrow optimization in Spark does not yet support both cases below.
+  if (any(sapply(schema$fields(),
+                 function(x) startsWith(x$dataType.toString(),
+                 "StructType")))) {
+    stop("Arrow optimization in R does not support nested struct type yet.")
+  }
+  if (any(sapply(schema$fields(),
+                 function(x) startsWith(x$dataType.toString(),
+                 "MapType")))) {
+    stop("Arrow optimization in R does not support map type yet.")
+  }
+}
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 16d423f..c60c951 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -307,80 +307,6 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
-test_that("createDataFrame/collect Arrow optimization", {
-  skip_if_not_installed("arrow")
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    expected <- collect(createDataFrame(mtcars))
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    expect_equal(collect(createDataFrame(mtcars)), expected)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
-  skip_if_not_installed("arrow")
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
-                 collect(createDataFrame(mtcars, numPartitions = 1)))
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("createDataFrame/collect Arrow optimization - type specification", {
-  skip_if_not_installed("arrow")
-  rdf <- data.frame(list(list(a = 1,
-                              b = "a",
-                              c = TRUE,
-                              d = 1.1,
-                              e = 1L,
-                              f = as.Date("1990-02-24"),
-                              g = as.POSIXct("1990-02-24 12:34:56"))))
-
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-  conf <- callJMethod(sparkSession, "conf")
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    expected <- collect(createDataFrame(rdf))
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    expect_equal(collect(createDataFrame(rdf)), expected)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
 test_that("read/write csv as DataFrame", {
   if (windows_with_hadoop()) {
     csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
@@ -3317,105 +3243,6 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
 
 })
 
-test_that("dapply() Arrow optimization", {
-  skip_if_not_installed("arrow")
-  df <- createDataFrame(mtcars)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    ret <- dapply(df,
-    function(rdf) {
-      stopifnot(class(rdf) == "data.frame")
-      rdf
-    },
-    schema(df))
-    expected <- collect(ret)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- dapply(df,
-                  function(rdf) {
-                    stopifnot(class(rdf) == "data.frame")
-                    # mtcars' hp is more then 50.
-                    stopifnot(all(rdf$hp > 50))
-                    rdf
-                  },
-                  schema(df))
-    actual <- collect(ret)
-    expect_equal(actual, expected)
-    expect_equal(count(ret), nrow(mtcars))
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("dapply() Arrow optimization - type specification", {
-  skip_if_not_installed("arrow")
-  # Note that regular dapply() seems not supporting date and timestamps
-  # whereas Arrow-optimized dapply() does.
-  rdf <- data.frame(list(list(a = 1,
-                              b = "a",
-                              c = TRUE,
-                              d = 1.1,
-                              e = 1L)))
-  # numPartitions are set to 8 intentionally to test empty partitions as well.
-  df <- createDataFrame(rdf, numPartitions = 8)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    ret <- dapply(df, function(rdf) { rdf }, schema(df))
-    expected <- collect(ret)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- dapply(df, function(rdf) { rdf }, schema(df))
-    actual <- collect(ret)
-    expect_equal(actual, expected)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
-  skip_if_not_installed("arrow")
-  rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
-                              b = as.POSIXct("1990-02-24 12:34:56"))))
-  df <- createDataFrame(rdf)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- dapply(df, function(rdf) { rdf }, schema(df))
-    expect_equal(collect(ret), rdf)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
 test_that("repartition by columns on DataFrame", {
   # The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
   # partitions to reduce the number of the tasks to speed up the test. This is particularly
@@ -3645,120 +3472,6 @@ test_that("gapply() and gapplyCollect() on a DataFrame", {
   })
 })
 
-test_that("gapply() Arrow optimization", {
-  skip_if_not_installed("arrow")
-  df <- createDataFrame(mtcars)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    ret <- gapply(df,
-                 "gear",
-                 function(key, grouped) {
-                   if (length(key) > 0) {
-                     stopifnot(is.numeric(key[[1]]))
-                   }
-                   stopifnot(class(grouped) == "data.frame")
-                   grouped
-                 },
-                 schema(df))
-    expected <- collect(ret)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- gapply(df,
-                 "gear",
-                 function(key, grouped) {
-                   if (length(key) > 0) {
-                     stopifnot(is.numeric(key[[1]]))
-                   }
-                   stopifnot(class(grouped) == "data.frame")
-                   stopifnot(length(colnames(grouped)) == 11)
-                   # mtcars' hp is more then 50.
-                   stopifnot(all(grouped$hp > 50))
-                   grouped
-                 },
-                 schema(df))
-    actual <- collect(ret)
-    expect_equal(actual, expected)
-    expect_equal(count(ret), nrow(mtcars))
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("gapply() Arrow optimization - type specification", {
-  skip_if_not_installed("arrow")
-  # Note that regular gapply() seems not supporting date and timestamps
-  # whereas Arrow-optimized gapply() does.
-  rdf <- data.frame(list(list(a = 1,
-                              b = "a",
-                              c = TRUE,
-                              d = 1.1,
-                              e = 1L)))
-  df <- createDataFrame(rdf)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
-  tryCatch({
-    ret <- gapply(df,
-                 "a",
-                 function(key, grouped) { grouped }, schema(df))
-    expected <- collect(ret)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- gapply(df,
-                 "a",
-                 function(key, grouped) { grouped }, schema(df))
-    actual <- collect(ret)
-    expect_equal(actual, expected)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
-test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
-  skip_if_not_installed("arrow")
-  rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
-                              b = as.POSIXct("1990-02-24 12:34:56"))))
-  df <- createDataFrame(rdf)
-
-  conf <- callJMethod(sparkSession, "conf")
-  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
-
-  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
-  tryCatch({
-    ret <- gapply(df,
-                  "a",
-                  function(key, grouped) { grouped }, schema(df))
-    expect_equal(collect(ret), rdf)
-  },
-  finally = {
-    # Resetting the conf back to default value
-    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
-  })
-})
-
 test_that("Window functions on a DataFrame", {
   df <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
                         schema = c("key", "value"))
diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
new file mode 100644
index 0000000..25a6d3c
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
@@ -0,0 +1,315 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("SparkSQL Arrow optimization")
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+
+test_that("createDataFrame/collect Arrow optimization", {
+  skip_if_not_installed("arrow")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    expected <- collect(createDataFrame(mtcars))
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_equal(collect(createDataFrame(mtcars)), expected)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("createDataFrame/collect Arrow optimization - many partitions (partition order test)", {
+  skip_if_not_installed("arrow")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_equal(collect(createDataFrame(mtcars, numPartitions = 32)),
+                 collect(createDataFrame(mtcars, numPartitions = 1)))
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("createDataFrame/collect Arrow optimization - type specification", {
+  skip_if_not_installed("arrow")
+  rdf <- data.frame(list(list(a = 1,
+                              b = "a",
+                              c = TRUE,
+                              d = 1.1,
+                              e = 1L,
+                              f = as.Date("1990-02-24"),
+                              g = as.POSIXct("1990-02-24 12:34:56"))))
+
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+  conf <- callJMethod(sparkSession, "conf")
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    expected <- collect(createDataFrame(rdf))
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_equal(collect(createDataFrame(rdf)), expected)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("dapply() Arrow optimization", {
+  skip_if_not_installed("arrow")
+  df <- createDataFrame(mtcars)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    ret <- dapply(df,
+    function(rdf) {
+      stopifnot(class(rdf) == "data.frame")
+      rdf
+    },
+    schema(df))
+    expected <- collect(ret)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- dapply(df,
+                  function(rdf) {
+                    stopifnot(class(rdf) == "data.frame")
+                    # mtcars' hp is more then 50.
+                    stopifnot(all(rdf$hp > 50))
+                    rdf
+                  },
+                  schema(df))
+    actual <- collect(ret)
+    expect_equal(actual, expected)
+    expect_equal(count(ret), nrow(mtcars))
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("dapply() Arrow optimization - type specification", {
+  skip_if_not_installed("arrow")
+  # Note that regular dapply() seems not supporting date and timestamps
+  # whereas Arrow-optimized dapply() does.
+  rdf <- data.frame(list(list(a = 1,
+                              b = "a",
+                              c = TRUE,
+                              d = 1.1,
+                              e = 1L)))
+  # numPartitions are set to 8 intentionally to test empty partitions as well.
+  df <- createDataFrame(rdf, numPartitions = 8)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    ret <- dapply(df, function(rdf) { rdf }, schema(df))
+    expected <- collect(ret)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- dapply(df, function(rdf) { rdf }, schema(df))
+    actual <- collect(ret)
+    expect_equal(actual, expected)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("dapply() Arrow optimization - type specification (date and timestamp)", {
+  skip_if_not_installed("arrow")
+  rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
+                              b = as.POSIXct("1990-02-24 12:34:56"))))
+  df <- createDataFrame(rdf)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- dapply(df, function(rdf) { rdf }, schema(df))
+    expect_equal(collect(ret), rdf)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("gapply() Arrow optimization", {
+  skip_if_not_installed("arrow")
+  df <- createDataFrame(mtcars)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    ret <- gapply(df,
+                 "gear",
+                 function(key, grouped) {
+                   if (length(key) > 0) {
+                     stopifnot(is.numeric(key[[1]]))
+                   }
+                   stopifnot(class(grouped) == "data.frame")
+                   grouped
+                 },
+                 schema(df))
+    expected <- collect(ret)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- gapply(df,
+                 "gear",
+                 function(key, grouped) {
+                   if (length(key) > 0) {
+                     stopifnot(is.numeric(key[[1]]))
+                   }
+                   stopifnot(class(grouped) == "data.frame")
+                   stopifnot(length(colnames(grouped)) == 11)
+                   # mtcars' hp is more then 50.
+                   stopifnot(all(grouped$hp > 50))
+                   grouped
+                 },
+                 schema(df))
+    actual <- collect(ret)
+    expect_equal(actual, expected)
+    expect_equal(count(ret), nrow(mtcars))
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("gapply() Arrow optimization - type specification", {
+  skip_if_not_installed("arrow")
+  # Note that regular gapply() seems not supporting date and timestamps
+  # whereas Arrow-optimized gapply() does.
+  rdf <- data.frame(list(list(a = 1,
+                              b = "a",
+                              c = TRUE,
+                              d = 1.1,
+                              e = 1L)))
+  df <- createDataFrame(rdf)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
+    ret <- gapply(df,
+                 "a",
+                 function(key, grouped) { grouped }, schema(df))
+    expected <- collect(ret)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- gapply(df,
+                 "a",
+                 function(key, grouped) { grouped }, schema(df))
+    actual <- collect(ret)
+    expect_equal(actual, expected)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("gapply() Arrow optimization - type specification (date and timestamp)", {
+  skip_if_not_installed("arrow")
+  rdf <- data.frame(list(list(a = as.Date("1990-02-24"),
+                              b = as.POSIXct("1990-02-24 12:34:56"))))
+  df <- createDataFrame(rdf)
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    ret <- gapply(df,
+                  "a",
+                  function(key, grouped) { grouped }, schema(df))
+    expect_equal(collect(ret), rdf)
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+test_that("Arrow optimization - unsupported types", {
+  skip_if_not_installed("arrow")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "true")
+  tryCatch({
+    expect_error(checkSchemaInArrow(structType("a FLOAT")), "not support float type")
+    expect_error(checkSchemaInArrow(structType("a BINARY")), "not support binary type")
+    expect_error(checkSchemaInArrow(structType("a ARRAY<INT>")), "not support array type")
+    expect_error(checkSchemaInArrow(structType("a MAP<INT, INT>")), "not support map type")
+    expect_error(checkSchemaInArrow(structType("a STRUCT<a: INT>")),
+                 "not support nested struct type")
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", arrowEnabled)
+  })
+})
+
+sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org