You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jorge (Jira)" <ji...@apache.org> on 2020/09/24 10:34:00 UTC
[jira] [Comment Edited] (ARROW-9707) [Rust] [DataFusion]
Re-implement threading model
[ https://issues.apache.org/jira/browse/ARROW-9707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17201435#comment-17201435 ]
Jorge edited comment on ARROW-9707 at 9/24/20, 10:33 AM:
---------------------------------------------------------
I thought a bit about this, and I have an hypothesis: the core pattern in DataFusion today is:
# {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function {{build}}
# {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
# {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
# {{RecordBatchReaders}}'s {{size_hint}} is unknown, as it typically comes from scaning through a file
If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an {{IntoIter<Item=IntoIter<Item=RecordBatch>>}}
which would allow to easily flatten the iterator over a set of threads. Threads switch context during I/O, e.g. when a thread finishes reading a block of batches in a CSV, it can switch to execute whatever it is happening in another place.
This will also give more visibility to a scheduler, as it has all the necessary information it needs to schedule tasks.
[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would also allow other architectures, as they would offload whole partitions to different processes. In our context, something like this (all static for now
{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
IterT: IntoIterator<Item = T>,
IterIterT: IntoIterator<Item = IterT>,
{
let all_values = Arc::new(Mutex::new(vec![]));
let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
let mutex_clone = Arc::clone(&all_values);
thread::spawn(move || {
let mut values: Vec<T> = array.into_iter().collect();
values.sort(); // this is wrong, but all information is available to do it
mutex_clone.lock().unwrap().extend_from_slice(&values);
})
}).collect();
for thread in threads {
thread.join().unwrap()
}
let result = all_values.lock().unwrap().clone();
result
}
{code}
was (Author: jorgecarleitao):
I thought a bit about this, and I have an hypothesis: the core pattern in DataFusion today is:
# {{ExecutionPlan}} is an iterator of {{RecordBatchReader}} via the function {{build}}
# {{RecordBatchReaders}} is an iterator of {{RecordBatch}} via {{next_batch}}
# {{ExecutionPlan}} 's {{size_hint}} is given by {{output_partitioning}}
# {{RecordBatchReaders}}{{}} {{size_hint}} is unknown, as it typically comes from scaning through a file
If this hypothesis holds, IMO we could convert {{ExecutionPlan}} to allow an {{IntoIter<Item=IntoIter<Item=RecordBatch>>}}
which would allow to easily flatten the iterator over a set of threads. Threads switch context during I/O, e.g. when a thread finishes reading a block of batches in a CSV, it can switch to execute whatever it is happening in another place.
This will also give more visibility to a scheduler, as it has all the necessary information it needs to schedule tasks.
[this answer in SO|https://stackoverflow.com/a/53176418/931303] uses IntoIter of IntoIter to implement a generic (single-threaded) {{merge_sort}}. This would also allow other architectures, as they would offload whole partitions to different processes. In our context, something like this (all static for now
{code:java}
fn merge_sorted<IterT: 'static + Send, IterIterT, T: 'static + Ord + Clone + fmt::Debug + Send>(arrays: IterIterT) -> Vec<T>
where
IterT: IntoIterator<Item = T>,
IterIterT: IntoIterator<Item = IterT>,
{
let all_values = Arc::new(Mutex::new(vec![]));
let threads: Vec<JoinHandle<()>> = arrays.into_iter().map(|array| {
let mutex_clone = Arc::clone(&all_values);
thread::spawn(move || {
let mut values: Vec<T> = array.into_iter().collect();
values.sort(); // this is wrong, but all information is available to do it
mutex_clone.lock().unwrap().extend_from_slice(&values);
})
}).collect();
for thread in threads {
thread.join().unwrap()
}
let result = all_values.lock().unwrap().clone();
result
}
{code}
> [Rust] [DataFusion] Re-implement threading model
> ------------------------------------------------
>
> Key: ARROW-9707
> URL: https://issues.apache.org/jira/browse/ARROW-9707
> Project: Apache Arrow
> Issue Type: Sub-task
> Components: Rust, Rust - DataFusion
> Reporter: Andy Grove
> Assignee: Andy Grove
> Priority: Major
> Fix For: 2.0.0
>
>
> The current threading model is very simple and does not scale. We currently use 1-2 dedicated threads per partition and they all run simultaneously, which is a huge problem if you have more partitions than logical or physical cores.
> This task is to re-implement the threading model so that query execution uses a fixed (configurable) number of threads. Work will be broken down into stages and tasks and each in-process executor (running on a dedicated thread) will process its queue of tasks.
> This process will be driven by a scheduler.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)