You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/07/30 06:19:56 UTC

[spark] branch branch-3.0 updated: [SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 235552a  [SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization
235552a is described below

commit 235552aba3a7be42360502f8c164a4f23fe9469f
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu Jul 30 15:16:02 2020 +0900

    [SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to:
    
    1. Fix the error message when the output schema is misbatched with R DataFrame from the given function. For example,
    
        ```R
        df <- createDataFrame(list(list(a=1L, b="2")))
        count(gapply(df, "a", function(key, group) { group }, structType("a int, b int")))
        ```
    
        **Before:**
    
        ```
        Error in handleErrors(returnStatus, conn) :
          ...
          java.lang.UnsupportedOperationException
    	    ...
        ```
    
        **After:**
    
        ```
        Error in handleErrors(returnStatus, conn) :
         ...
         java.lang.AssertionError: assertion failed: Invalid schema from gapply: expected IntegerType, IntegerType, got IntegerType, StringType
            ...
        ```
    
    2. Update documentation about the schema matching for `gapply` and `dapply`.
    
    ### Why are the changes needed?
    
    To show which schema is not matched, and let users know what's going on.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, error message is updated as above, and documentation is updated.
    
    ### How was this patch tested?
    
    Manually tested and unitttests were added.
    
    Closes #29283 from HyukjinKwon/r-vectorized-error.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 R/pkg/tests/fulltests/test_sparkSQL_arrow.R            | 18 ++++++++++++++++++
 docs/sparkr.md                                         | 16 ++++++++--------
 .../scala/org/apache/spark/sql/execution/objects.scala |  9 ++++++++-
 3 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
index 9797275..16d9376 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
@@ -312,4 +312,22 @@ test_that("Arrow optimization - unsupported types", {
   })
 })
 
+test_that("SPARK-32478: gapply() Arrow optimization - error message for schema mismatch", {
+  skip_if_not_installed("arrow")
+  df <- createDataFrame(list(list(a = 1L, b = "a")))
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
+  tryCatch({
+    expect_error(
+    count(gapply(df, "a", function(key, group) { group }, structType("a int, b int"))),
+    "expected IntegerType, IntegerType, got IntegerType, StringType")
+  },
+  finally = {
+    callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
+  })
+})
+
 sparkR.session.stop()
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 9cdc392..15578fd 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -678,12 +678,12 @@ The current supported minimum version is 0.15.1; however, this might change betw
 Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
 when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
 via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
-To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
-to ‘true’. This is disabled by default.
+To use Arrow when executing these, users need to set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
+to ‘true’ first. This is disabled by default.
 
-In addition, optimizations enabled by ‘spark.sql.execution.arrow.sparkr.enabled’ could fallback automatically to non-Arrow optimization
-implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
-DataFrame.
+Whether the optimization is enabled or not, SparkR produces the same results. In addition, the conversion
+between Spark DataFrame and R DataFrame falls back automatically to non-Arrow optimization implementation
+when the optimization fails for any reasons before the actual computation.
 
 <div data-lang="r" markdown="1">
 {% highlight r %}
@@ -710,9 +710,9 @@ collect(gapply(spark_df,
 {% endhighlight %}
 </div>
 
-Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
-`collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
-small subset of the data.
+Note that even with Arrow, `collect(spark_df)` results in the collection of all records in the DataFrame to
+the driver program and should be done on a small subset of the data. In addition, the specified output schema
+in `gapply(...)` and `dapply(...)` should be matched to the R DataFrame's returned by the given function.
 
 ## Supported SQL Types
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 4b2d419..c08db13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -567,7 +567,14 @@ case class FlatMapGroupsInRWithArrowExec(
       // binary in a batch due to the limitation of R API. See also ARROW-4512.
       val columnarBatchIter = runner.compute(groupedByRKey, -1)
       val outputProject = UnsafeProjection.create(output, output)
-      columnarBatchIter.flatMap(_.rowIterator().asScala).map(outputProject)
+      val outputTypes = StructType.fromAttributes(output).map(_.dataType)
+
+      columnarBatchIter.flatMap { batch =>
+        val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
+        assert(outputTypes == actualDataTypes, "Invalid schema from gapply(): " +
+          s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
+        batch.rowIterator().asScala
+      }.map(outputProject)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org