You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/05/02 01:02:44 UTC

[spark] branch master updated: [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3670826  [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs
3670826 is described below

commit 3670826af6f40bf8cd6c6c850515d6c2f0a83519
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Thu May 2 10:02:14 2019 +0900

    [SPARK-26921][R][DOCS] Document Arrow optimization and vectorized R APIs
    
    ## What changes were proposed in this pull request?
    
    This PR adds SparkR with Arrow optimization documentation.
    
    Note that looks CRAN issue in Arrow side won't look likely fixed soon, IMHO, even after Spark 3.0.
    If it happen to be fixed, I will fix this doc too later.
    
    Another note is that Arrow R package itself requires R 3.5+. So, I intentionally didn't note this.
    
    ## How was this patch tested?
    
    Manually built and checked.
    
    Closes #24506 from HyukjinKwon/SPARK-26924.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 docs/sparkr.md                                     | 59 ++++++++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 18 +++++--
 2 files changed, 72 insertions(+), 5 deletions(-)

diff --git a/docs/sparkr.md b/docs/sparkr.md
index 26eeae6..d6b5179 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -657,6 +657,65 @@ The following example shows how to save/load a MLlib model by SparkR.
 
 SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
 
+# Apache Arrow in SparkR
+
+Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and R processes. See also PySpark optimization done, [PySpark Usage Guide for Pandas with Apache Arrow](sql-pyspark-pandas-with-arrow.html). This guide targets to explain how to use Arrow optimization in SparkR with some key points.
+
+## Ensure Arrow Installed
+
+Currently, Arrow R library is not on CRAN yet [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). Therefore, it should be installed directly from Github. You can use `remotes::install_github` as below.
+
+```bash
+Rscript -e 'remotes::install_github("apache/arrow@TAG", subdir = "r")'
+```
+
+`TAG` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R packge is installed and available on all cluster nodes. The current supported version is 0.12.1.
+
+## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`
+
+Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `createDataFrame(r_df)`,
+when creating a Spark DataFrame from an R DataFrame with `collect(spark_df)`, when applying an R native function to each partition
+via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
+To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.enabled’
+to ‘true’. This is disabled by default.
+
+In addition, optimizations enabled by ‘spark.sql.execution.arrow.enabled’ could fallback automatically to non-Arrow optimization
+implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
+DataFrame.
+
+<div data-lang="r" markdown="1">
+{% highlight r %}
+# Start up spark session with Arrow optimization enabled
+sparkR.session(master = "local[*]",
+               sparkConfig = list(spark.sql.execution.arrow.enabled = "true"))
+
+# Converts Spark DataFrame from an R DataFrame
+spark_df <- createDataFrame(mtcars)
+
+# Converts Spark DataFrame to an R DataFrame
+collect(spark_df)
+
+# Apply an R native function to each partition.
+collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
+
+# Apply an R native function to grouped data.
+collect(gapply(spark_df,
+               "gear",
+               function(key, group) {
+                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
+               },
+               structType("gear double, disp boolean")))
+{% endhighlight %}
+</div>
+
+Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
+`collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
+small subset of the data.
+
+## Supported SQL Types
+
+Currently, all Spark SQL data types are supported by Arrow-based conversion except `FloatType`, `BinaryType`, `ArrayType`, `StructType` and `MapType`.
+
 # R Function Name Conflicts
 
 When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 87bce1f..7f577f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1326,12 +1326,20 @@ object SQLConf {
 
   val ARROW_EXECUTION_ENABLED =
     buildConf("spark.sql.execution.arrow.enabled")
-      .doc("When true, make use of Apache Arrow for columnar data transfers. Currently available " +
-        "for use with pyspark.sql.DataFrame.toPandas, " +
-        "pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame, " +
-        "and createDataFrame when its input is an R DataFrame. " +
+      .doc("When true, make use of Apache Arrow for columnar data transfers." +
+        "In case of PySpark, " +
+        "1. pyspark.sql.DataFrame.toPandas " +
+        "2. pyspark.sql.SparkSession.createDataFrame when its input is a Pandas DataFrame " +
         "The following data types are unsupported: " +
-        "BinaryType, MapType, ArrayType of TimestampType, and nested StructType.")
+        "BinaryType, MapType, ArrayType of TimestampType, and nested StructType." +
+
+        "In case of SparkR," +
+        "1. createDataFrame when its input is an R DataFrame " +
+        "2. collect " +
+        "3. dapply " +
+        "4. gapply " +
+        "The following data types are unsupported: " +
+        "FloatType, BinaryType, ArrayType, StructType and MapType.")
       .booleanConf
       .createWithDefault(false)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org