You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/07/30 06:19:56 UTC
[spark] branch branch-3.0 updated: [SPARK-32478][R][SQL] Error
message to show the schema mismatch in gapply with Arrow vectorization
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 235552a [SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization
235552a is described below
commit 235552aba3a7be42360502f8c164a4f23fe9469f
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu Jul 30 15:16:02 2020 +0900
[SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization
### What changes were proposed in this pull request?
This PR proposes to:
1. Fix the error message when the output schema is misbatched with R DataFrame from the given function. For example,
```R
df <- createDataFrame(list(list(a=1L, b="2")))
count(gapply(df, "a", function(key, group) { group }, structType("a int, b int")))
```
**Before:**
```
Error in handleErrors(returnStatus, conn) :
...
java.lang.UnsupportedOperationException
...
```
**After:**
```
Error in handleErrors(returnStatus, conn) :
...
java.lang.AssertionError: assertion failed: Invalid schema from gapply: expected IntegerType, IntegerType, got IntegerType, StringType
...
```
2. Update documentation about the schema matching for `gapply` and `dapply`.
### Why are the changes needed?
To show which schema is not matched, and let users know what's going on.
### Does this PR introduce _any_ user-facing change?
Yes, error message is updated as above, and documentation is updated.
### How was this patch tested?
Manually tested and unitttests were added.
Closes #29283 from HyukjinKwon/r-vectorized-error.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
R/pkg/tests/fulltests/test_sparkSQL_arrow.R | 18 ++++++++++++++++++
docs/sparkr.md | 16 ++++++++--------
.../scala/org/apache/spark/sql/execution/objects.scala | 9 ++++++++-
3 files changed, 34 insertions(+), 9 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
index 9797275..16d9376 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R
@@ -312,4 +312,22 @@ test_that("Arrow optimization - unsupported types", {
})
})
+test_that("SPARK-32478: gapply() Arrow optimization - error message for schema mismatch", {
+ skip_if_not_installed("arrow")
+ df <- createDataFrame(list(list(a = 1L, b = "a")))
+
+ conf <- callJMethod(sparkSession, "conf")
+ arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]]
+
+ callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", "true")
+ tryCatch({
+ expect_error(
+ count(gapply(df, "a", function(key, group) { group }, structType("a int, b int"))),
+ "expected IntegerType, IntegerType, got IntegerType, StringType")
+ },
+ finally = {
+ callJMethod(conf, "set", "spark.sql.execution.arrow.sparkr.enabled", arrowEnabled)
+ })
+})
+
sparkR.session.stop()
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 9cdc392..15578fd 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -678,12 +678,12 @@ The current supported minimum version is 0.15.1; however, this might change betw
Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
-To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
-to ‘true’. This is disabled by default.
+To use Arrow when executing these, users need to set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
+to ‘true’ first. This is disabled by default.
-In addition, optimizations enabled by ‘spark.sql.execution.arrow.sparkr.enabled’ could fallback automatically to non-Arrow optimization
-implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
-DataFrame.
+Whether the optimization is enabled or not, SparkR produces the same results. In addition, the conversion
+between Spark DataFrame and R DataFrame falls back automatically to non-Arrow optimization implementation
+when the optimization fails for any reasons before the actual computation.
<div data-lang="r" markdown="1">
{% highlight r %}
@@ -710,9 +710,9 @@ collect(gapply(spark_df,
{% endhighlight %}
</div>
-Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
-`collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
-small subset of the data.
+Note that even with Arrow, `collect(spark_df)` results in the collection of all records in the DataFrame to
+the driver program and should be done on a small subset of the data. In addition, the specified output schema
+in `gapply(...)` and `dapply(...)` should be matched to the R DataFrame's returned by the given function.
## Supported SQL Types
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 4b2d419..c08db13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -567,7 +567,14 @@ case class FlatMapGroupsInRWithArrowExec(
// binary in a batch due to the limitation of R API. See also ARROW-4512.
val columnarBatchIter = runner.compute(groupedByRKey, -1)
val outputProject = UnsafeProjection.create(output, output)
- columnarBatchIter.flatMap(_.rowIterator().asScala).map(outputProject)
+ val outputTypes = StructType.fromAttributes(output).map(_.dataType)
+
+ columnarBatchIter.flatMap { batch =>
+ val actualDataTypes = (0 until batch.numCols()).map(i => batch.column(i).dataType())
+ assert(outputTypes == actualDataTypes, "Invalid schema from gapply(): " +
+ s"expected ${outputTypes.mkString(", ")}, got ${actualDataTypes.mkString(", ")}")
+ batch.rowIterator().asScala
+ }.map(outputProject)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org