You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jorge Leitão (Jira)" <ji...@apache.org> on 2020/12/29 06:29:00 UTC

[jira] [Commented] (ARROW-11058) [Rust] [DataFusion] Implement "coalesce batches" operator

    [ https://issues.apache.org/jira/browse/ARROW-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17255835#comment-17255835 ] 

Jorge Leitão commented on ARROW-11058:
--------------------------------------

This aspect of datafusion is a bit unclear atm: in datafusion, it seems that we have two types of "buckets": parts and batches, while in spark there is only parts (via partitioning). In spark, the partitioning tradeoff is related to higher parallelism vs slower exchanges, but I can't find the equivalent tradeoff with number of batches per part in datafusion.

6 months ago, my hypothesis was that partitioning would be used for cross-machine parallelism, while batches would be used for intra-machine parallelism. My idea at the time was: there is a stream of parts, and each part is an iterator of batches: batch execution runs in rayon, and each part is a future and part of a stream (via Tokio, potentially in another machine). In this design, the two "buckets" represent different parts of parallelism: thread parallelism (i.e. same machine) and process parallelism (i.e. cross-machine), that in a single machine would be run by two different thread pools.

But since we use a stream of batches and a stream of parts, I can't think of a way to differentiate them. E.g. let's say that we implement the "coalesce batches". When does using it is expected to improve performance? When should we add them in the optimizer?

More quantitatively: given N rows, in spark we can distribute them in P parts, while in datafusion we can distribute them in P parts and B batches. In spark, P deals with parallelism in very specific ways (higher P => higher parallelism and more tasks). In DataFusion, it is a bit unclear how the tuple (P,B) leads to one or the other, and what is the reason we have P and B in the first place (since they all run on the same thread pool and both are async).

AFAIK B is not necessarily related to vectorization, as the vectorization (at least on the CPU level) happens at much smaller chunks (lane size). B does not lead to higher parallelism also: since they are part of a stream, there is no way to run two batches from a single part in parallel, as we need to finish the execution of one before the start of the next.

This aspect of parallelism in datafusion was clear to me 6 months ago but became unclear when we converted the recordbatchreader to a stream.

> [Rust] [DataFusion] Implement "coalesce batches" operator
> ---------------------------------------------------------
>
>                 Key: ARROW-11058
>                 URL: https://issues.apache.org/jira/browse/ARROW-11058
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>             Fix For: 3.0.0
>
>
> When we have a FilterExec in the plan, it can produce lots of small batches and we therefore lose efficiency of vectorized operations.
> We should implement a new CoalesceBatchExec and wrap every FilterExec with one of these so that small batches can be recombined into larger batches to improve the efficiency of upstream operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)