You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/02/13 09:21:00 UTC
[jira] [Comment Edited] (SPARK-26858) Vectorized gapplyCollect,
Arrow optimization in native R function execution
[ https://issues.apache.org/jira/browse/SPARK-26858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766941#comment-16766941 ]
Hyukjin Kwon edited comment on SPARK-26858 at 2/13/19 9:20 AM:
---------------------------------------------------------------
cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]
Actually, I wonder if we should add {{gapplyCollect}} support.
Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart above to explain what it's tricky and hacky to implement.
I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here.
{code:java}
|Driver |Executor
|R side |JVM side |JVM side |R side |
|1. call `gapplyCollect` | | | |
| | | | |
|2. DataFrame<R: binary> is | | | |
| set for its output schema | | | |
| and call `collect` | | | |
| | | | |
|3. | Query plan is done and it executes | | |
| | | | |
|4. | | Serialize from JVM record to | |
| | | R recode line by line | |
| | | | |
|5. | | Send bytes | Recieve bytes |
| | | | |
|6. | | | Deserializes bytes to an R Data frame |
| | | | |
|7. | | | Execute R function (with key) |
| | | | |
|8. | | | Serializes output R DataFrame |
| | | | |
|9. | | Recieve bytes | Send bytes back |
| | | | |
|10. | | Wrap it with JVM row Row(Array[Byte]); | |
| | | each record is an R DataFrame. | |
|11. | Row(Array[Byte]) are collected | | |
| | | | |
|12. Deserializes each as an | | | |
| R Data Frame | | | |
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF.
However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution.
So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...))
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically
{code:java}
| Schema |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)
This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization. Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly but looks R API does not have it. If R API has it, the workaround might be easier.
There are few ways to work around this problem.
1. Read schema from the first {{Arrow Batch}} and use it.
I was trying this way but realised it's pretty hacky and different from what we do in Python side.
2. Map {{df}} to whole Arrow streaming format above.
This is different protocol with Python vectorization.
3. Send Arrow batch by Arrow batch without {{Schema}}.
This is different protocol with Python vectorization.
4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way).
I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization.
For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1.
3. Might needs smaller change.
So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it.
Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately.
I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.
was (Author: hyukjin.kwon):
cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]
Actually, I wonder if we should add {{gapplyCollect}} support.
Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart above to explain what it's tricky and hacky to implement.
I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here.
{code:java}
|Driver |Executor
|R side |JVM side |JVM side |R side |
|1. call `gapplyCollect` | | | |
| | | | |
|2. DataFrame<R: binary> is | | | |
| set for its output schema | | | |
| and call `collect` | | | |
| | | | |
|3. | Query plan is done and it executes | | |
| | | | |
|4. | | Serialize from JVM record to | |
| | | R recode line by line | |
| | | | |
|5. | | Send bytes | Recieve bytes |
| | | | |
|6. | | | Deserializes bytes to an R Data frame |
| | | | |
|7. | | | Execute R function (with key) |
| | | | |
|8. | | | Serializes output R DataFrame |
| | | | |
|9. | | Recieve bytes | Send bytes back |
| | | | |
|10. | | Wrap it with JVM row Row(Array[Byte]); | |
| | | each record is an R DataFrame. | |
|11. | Row(Array[Byte]) are collected | | |
| | | | |
|12. Deserializes each as an | | | |
| R Data Frame | | | |
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF.
However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution.
So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...))
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically
{code:java}
| Schema |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)
This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization.
There are few ways to work around this problem.
1. Read schema from the first {{Arrow Batch}} and use it.
I was trying this way but realised it's pretty hacky and different from what we do in Python side.
2. Map {{df}} to whole Arrow streaming format above.
This is different protocol with Python vectorization.
3. Send Arrow batch by Arrow batch without {{Schema}}.
This is different protocol with Python vectorization.
4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way).
I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization.
For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1.
3. Might needs smaller change.
So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it.
Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately.
I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.
> Vectorized gapplyCollect, Arrow optimization in native R function execution
> ---------------------------------------------------------------------------
>
> Key: SPARK-26858
> URL: https://issues.apache.org/jira/browse/SPARK-26858
> Project: Spark
> Issue Type: Sub-task
> Components: SparkR, SQL
> Affects Versions: 3.0.0
> Reporter: Hyukjin Kwon
> Assignee: Hyukjin Kwon
> Priority: Major
>
> Unlike gapply, gapplyCollect requires additional ser/de steps because it can omit the schema, and Spark SQL doesn't know the return type before actually execution happens.
> In original code path, it's done via using binary schema. Once gapply is done (SPARK-26761). we can mimic this approach in vectorized gapply to support gapplyCollect.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org