You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2023/01/12 21:06:00 UTC

[jira] [Assigned] (HUDI-5023) Add new Executor avoiding Queueing in the write-path

     [ https://issues.apache.org/jira/browse/HUDI-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin reassigned HUDI-5023:
-------------------------------------

    Assignee: Yue Zhang  (was: Alexey Kudinkin)

> Add new Executor avoiding Queueing in the write-path
> ----------------------------------------------------
>
>                 Key: HUDI-5023
>                 URL: https://issues.apache.org/jira/browse/HUDI-5023
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: writer-core
>            Reporter: Alexey Kudinkin
>            Assignee: Yue Zhang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.13.0
>
>
> We should evaluate removing _any queueing_ (BoundedInMemoryQueue, DisruptorQueue) on the write path for multiple reasons:
> *It breaks up vertical chain of transformations applied to data*
> Spark (alas other engines) rely on the notion of _Iteration_ to vertically compose all transformations applied to a single record to allow for effective _stream_ processing, where all transformations are applied to an _Iterator, yielding records_ from the source, that way
>  # Chain of transformations* is applied to every record one by one, allowing to effectively limit amount of memory used to the number of records being read and processed simultaneously (if the reading is not batched, it'd be just a single record), which in turn allows
>  # To limit # of memory allocations required to process a single record. Consider the opposite: if we'd do it breadth-wise, applying first transformation to _all_ of the records, we will have to store all of transformed records in memory which is costly from both GC overhead as well as pure object churn perspectives.
>  
> Enqueueing is essentially violates both of these invariants, breaking up {_}stream{_}-like processing model and forcing records to be kept in memory for no good reason.
>  
> * This chain is broken up at shuffling points (collection of tasks executed b/w these shuffling points are called stages in Spark)
>  
> *It requires data to be allocated on the heap*
> As was called out in the previous paragraph, enqueueing raw data read from the source breaks up _stream_ processing paradigm and forces records to be persisted in the heap.
> Consider following example: plain ParquetReader from Spark actually uses *mutable* `ColumnarBatchRow` providing a Row-based view into the batch of data being read from the file.
> Now, since it's a mutable object we can use it to _iterate_ over all of the records (while doing stream-processing) ultimately producing some "output" (either writing into another file, shuffle block, etc), but we +can't keep a reference on it+ (for ex, by +enqueueing+ it) – since the object is mutable. Instead we are forced to make a *copy* of it, which will obviously require us to allocate it on the heap.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)