You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/02/13 09:21:00 UTC

[jira] [Comment Edited] (SPARK-26858) Vectorized gapplyCollect, Arrow optimization in native R function execution

    [ https://issues.apache.org/jira/browse/SPARK-26858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766941#comment-16766941 ] 

Hyukjin Kwon edited comment on SPARK-26858 at 2/13/19 9:20 AM:
---------------------------------------------------------------

cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]

Actually, I wonder if we should add {{gapplyCollect}} support.
 Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart above to explain what it's tricky and hacky to implement.
 I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here.
{code:java}
|Driver                                                             |Executor
|R side                      |JVM side                              |JVM side                                |R side                                 |

|1. call `gapplyCollect`     |                                      |                                        |                                       |
|                            |                                      |                                        |                                       |
|2.  DataFrame<R: binary> is |                                      |                                        |                                       |
|  set for its output schema |                                      |                                        |                                       |
|  and call `collect`        |                                      |                                        |                                       |
|                            |                                      |                                        |                                       |
|3.                          | Query plan is done and it executes   |                                        |                                       |
|                            |                                      |                                        |                                       |
|4.                          |                                      | Serialize from JVM record to           |                                       |
|                            |                                      |   R recode line by line                |                                       |
|                            |                                      |                                        |                                       |
|5.                          |                                      | Send bytes                             | Recieve bytes                         |
|                            |                                      |                                        |                                       |
|6.                          |                                      |                                        | Deserializes bytes to an R Data frame |
|                            |                                      |                                        |                                       |
|7.                          |                                      |                                        | Execute R function (with key)         |
|                            |                                      |                                        |                                       |
|8.                          |                                      |                                        | Serializes output R DataFrame         |
|                            |                                      |                                        |                                       |
|9.                          |                                      | Recieve bytes                          | Send bytes back                       |
|                            |                                      |                                        |                                       |
|10.                         |                                      | Wrap it with JVM row Row(Array[Byte]); |                                       |
|                            |                                      | each record is an R DataFrame.         |                                       |
|11.                         | Row(Array[Byte]) are collected       |                                        |                                       |
|                            |                                      |                                        |                                       |
|12. Deserializes each as an |                                      |                                        |                                       |
|  R Data Frame              |                                      |                                        |                                       |                       
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF.
 However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution.

So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...)) 
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically
{code:java}
| Schema      |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
   ...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)

This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
 So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization. Also, looks Python Arrow API has an API that read an Arrow table from Arrow batches directly but looks R API does not have it. If R API has it, the workaround might be easier.

There are few ways to work around this problem.

  1. Read schema from the first {{Arrow Batch}} and use it.
    I was trying this way but realised it's pretty hacky and different from what we do in Python side.

  2. Map {{df}} to whole Arrow streaming format above.
    This is different protocol with Python vectorization.

  3. Send Arrow batch by Arrow batch without {{Schema}}. 
    This is different protocol with Python vectorization.

  4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way).

I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization.
 For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1.
3. Might needs smaller change.

So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it.

Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately.

I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.


was (Author: hyukjin.kwon):
cc [~felixcheung], [~shivaram], [~rxin@databricks.com], [~bryanc], [~viirya]

Actually, I wonder if we should add {{gapplyCollect}} support.
 Here is the rough diagram of {{gapplyCollect}}. I roughly made the chart above to explain what it's tricky and hacky to implement.
 I still see row by row operation here and there in {{gapplyCollect}} but I don't target to explain performance aspect here.
{code:java}
|Driver                                                             |Executor
|R side                      |JVM side                              |JVM side                                |R side                                 |

|1. call `gapplyCollect`     |                                      |                                        |                                       |
|                            |                                      |                                        |                                       |
|2.  DataFrame<R: binary> is |                                      |                                        |                                       |
|  set for its output schema |                                      |                                        |                                       |
|  and call `collect`        |                                      |                                        |                                       |
|                            |                                      |                                        |                                       |
|3.                          | Query plan is done and it executes   |                                        |                                       |
|                            |                                      |                                        |                                       |
|4.                          |                                      | Serialize from JVM record to           |                                       |
|                            |                                      |   R recode line by line                |                                       |
|                            |                                      |                                        |                                       |
|5.                          |                                      | Send bytes                             | Recieve bytes                         |
|                            |                                      |                                        |                                       |
|6.                          |                                      |                                        | Deserializes bytes to an R Data frame |
|                            |                                      |                                        |                                       |
|7.                          |                                      |                                        | Execute R function (with key)         |
|                            |                                      |                                        |                                       |
|8.                          |                                      |                                        | Serializes output R DataFrame         |
|                            |                                      |                                        |                                       |
|9.                          |                                      | Recieve bytes                          | Send bytes back                       |
|                            |                                      |                                        |                                       |
|10.                         |                                      | Wrap it with JVM row Row(Array[Byte]); |                                       |
|                            |                                      | each record is an R DataFrame.         |                                       |
|11.                         | Row(Array[Byte]) are collected       |                                        |                                       |
|                            |                                      |                                        |                                       |
|12. Deserializes each as an |                                      |                                        |                                       |
|  R Data Frame              |                                      |                                        |                                       |                       
{code}
The problem is that it uses {{BinaryType}} to wrap and ship the data. {{gapply}} is okay because {{schema}} must be set like Pandas Groupped Map UDF.
 However, looks {{gapplyCollect}} omits the output schema, and JVM side doesn't know the schema before execution.

So, from 8. to 12. above, this way is fine if we use regular {{gapply}} because each R Data Frame can be ser/de and then combined later.
{code:java}
do.call("rbind", list(df, df, df ...)) 
{code}
However, in case of Arrow, it needs to know schema ahead because Arrow stream format is basically
{code:java}
| Schema      |
|-------------|
| Arrow Batch |
|-------------|
| Arrow Batch |
|-------------|
   ...
{code}
(Each {{df}} is mapped to each {{Arrow Batch}} above conceptionally.)

This Arrow {{Schema}} is always converted from {{DataFrame}}'s schema but we don't know this in {{gapplyCollect}} since it's wrapped by {{BiaryType}}.
 So, we should do a different approach to implement {{gapplyCollect}} to enable Arrow optimization.

There are few ways to work around this problem.

  1. Read schema from the first {{Arrow Batch}} and use it.
    I was trying this way but realised it's pretty hacky and different from what we do in Python side.

  2. Map {{df}} to whole Arrow streaming format above.
    This is different protocol with Python vectorization.

  3. Send Arrow batch by Arrow batch without {{Schema}}. 
    This is different protocol with Python vectorization.

  4. Don't support {{gapplyCollect}} with Arrow optimization and warn that users should use {{collect}} and {{gapply}} combination instead (this is the current way).

I tried 1. way just now and looks possible but pretty hacky, and at least different from Python side vectorization.
 For 2. approach, I haven't tried it yet but I am sure it needs a bigger change comparing to 1.
3. Might needs smaller change.

So, my conclusion is that 4. for now, and implement it by one of the ways above. I am worried about the maintenance overhead if we go ahead with a different approach with Python side.
I would like to postpone {{gapplyCollect}} with Arrow optimization and starts to discuss again when some users actually ask it.

Currently, we just throw an exception if users call {{gapplyCollect}} that users should user {{gapply}} and {{collect}} separately.

I am leaving this JIRA as {{Later}} because the workaround is possible and easy anyway.

> Vectorized gapplyCollect, Arrow optimization in native R function execution
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-26858
>                 URL: https://issues.apache.org/jira/browse/SPARK-26858
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SparkR, SQL
>    Affects Versions: 3.0.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>
> Unlike gapply, gapplyCollect requires additional ser/de steps because it can omit the schema, and Spark SQL doesn't know the return type before actually execution happens.
> In original code path, it's done via using binary schema. Once gapply is done (SPARK-26761). we can mimic this approach in vectorized gapply to support gapplyCollect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org