You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jorge (Jira)" <ji...@apache.org> on 2020/09/24 10:34:00 UTC

[jira] [Comment Edited] (ARROW-9707) [Rust] [DataFusion] Re-implement threading model

    [ https://issues.apache.org/jira/browse/ARROW-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201435#comment-17201435 ] 

Jorge edited comment on ARROW-9707 at 9/24/20, 10:33 AM:
---------------------------------------------------------

I thought a bit about this, and I have an hypothesis: the core pattern in DataFusion today is:
 # {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function {{build}}
 # {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
 # {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
 # {{RecordBatchReaders}}'s {{size_hint}}  is unknown, as it typically comes from scaning through a file

If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an {{IntoIter<Item=IntoIter<Item=RecordBatch>>}}

which would allow to easily flatten the iterator over a set of threads. Threads switch context during I/O, e.g. when a thread finishes reading a block of batches in a CSV, it can switch to execute whatever it is happening in another place.

This will also give more visibility to a scheduler, as it has all the necessary information it needs to schedule tasks.

[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would also allow other architectures, as they would offload whole partitions to different processes. In our context, something like this (all static for now


{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
    IterT: IntoIterator<Item = T>,
    IterIterT: IntoIterator<Item = IterT>,
{
    let all_values = Arc::new(Mutex::new(vec![]));
    let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
        let mutex_clone = Arc::clone(&all_values);
        thread::spawn(move || {
            let mut values: Vec<T> = array.into_iter().collect();
            values.sort(); // this is wrong, but all information is available to do it
            mutex_clone.lock().unwrap().extend_from_slice(&values);
        })
    }).collect();
    for thread in threads {
        thread.join().unwrap()
    }
    let result = all_values.lock().unwrap().clone();
    result
}
{code}


was (Author: jorgecarleitao):
I thought a bit about this, and I have an hypothesis: the core pattern in DataFusion today is:
 # {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function {{build}}
 # {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
 # {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
 # {{RecordBatchReaders}}{{}} {{size_hint}}  is unknown, as it typically comes from scaning through a file

If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an {{IntoIter<Item=IntoIter<Item=RecordBatch>>}}

which would allow to easily flatten the iterator over a set of threads. Threads switch context during I/O, e.g. when a thread finishes reading a block of batches in a CSV, it can switch to execute whatever it is happening in another place.

This will also give more visibility to a scheduler, as it has all the necessary information it needs to schedule tasks.

[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would also allow other architectures, as they would offload whole partitions to different processes. In our context, something like this (all static for now


{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
    IterT: IntoIterator<Item = T>,
    IterIterT: IntoIterator<Item = IterT>,
{
    let all_values = Arc::new(Mutex::new(vec![]));
    let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
        let mutex_clone = Arc::clone(&all_values);
        thread::spawn(move || {
            let mut values: Vec<T> = array.into_iter().collect();
            values.sort(); // this is wrong, but all information is available to do it
            mutex_clone.lock().unwrap().extend_from_slice(&values);
        })
    }).collect();
    for thread in threads {
        thread.join().unwrap()
    }
    let result = all_values.lock().unwrap().clone();
    result
}
{code}

> [Rust] [DataFusion] Re-implement threading model
> ------------------------------------------------
>
>                 Key: ARROW-9707
>                 URL: https://issues.apache.org/jira/browse/ARROW-9707
>             Project: Apache Arrow
>          Issue Type: Sub-task
>          Components: Rust, Rust - DataFusion
>            Reporter: Andy Grove
>            Assignee: Andy Grove
>            Priority: Major
>             Fix For: 2.0.0
>
>
> The current threading model is very simple and does not scale. We currently use 1-2 dedicated threads per partition and they all run simultaneously, which is a huge problem if you have more partitions than logical or physical cores.
> This task is to re-implement the threading model so that query execution uses a fixed (configurable) number of threads. Work will be broken down into stages and tasks and each in-process executor (running on a dedicated thread) will process its queue of tasks.
> This process will be driven by a scheduler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)