You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/13 15:02:50 UTC

[GitHub] [arrow] alamb commented on a change in pull request #7951: ARROW-9716: [Rust] [DataFusion] Implement limit on concurrent threads in MergeExec

alamb commented on a change in pull request #7951:
URL: https://github.com/apache/arrow/pull/7951#discussion_r470019311



##########
File path: rust/datafusion/src/execution/physical_plan/merge.rs
##########
@@ -64,33 +75,51 @@ struct MergePartition {
     schema: SchemaRef,
     /// Input partitions
     partitions: Vec<Arc<dyn Partition>>,
+    /// Maximum number of concurrent threads
+    max_concurrency: usize,
+}
+
+fn collect_from_thread(
+    thread: JoinHandle<Result<Vec<RecordBatch>>>,
+    combined_results: &mut Vec<Arc<RecordBatch>>,
+) -> Result<()> {
+    match thread.join() {
+        Ok(join) => {
+            join?
+                .iter()
+                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+            Ok(())
+        }
+        Err(e) => Err(ExecutionError::General(format!(
+            "Error collecting batches from thread: {:?}",
+            e
+        ))),
+    }
 }
 
 impl Partition for MergePartition {
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
-            .partitions
-            .iter()
-            .map(|p| {
-                let p = p.clone();
-                thread::spawn(move || {
-                    let it = p.execute()?;
-                    common::collect(it)
-                })
-            })
-            .collect();
+        let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
+        let mut threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = vec![];
+
+        for partition in &self.partitions {
+            // limit number of concurrent threads
+            if threads.len() == self.max_concurrency {

Review comment:
       This approach (to wait for an arbitrary thread (thread 0) to be done) can lead to cases where the thread you are waiting on happens to have a large amount of work but other threads are waiting on the next partitions. Thus you can block here with only one thread working.
   
   Another approach that would avoid that starvation would be to put the partitions into a `Arc<Mutex<Channel>>` and then have the threads each read and process partitions from that channel. 
   
   I can try and whip that up if you would like a specific code example of what I am talking about

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -62,10 +62,31 @@ use crate::sql::{
 };
 use crate::table::Table;
 
+/// Configuration options for execution context
+#[derive(Copy, Clone)]
+pub struct ExecutionConfig {
+    /// Maximum number of concurrent threads for query execution
+    max_concurrency: usize,
+}
+
+impl ExecutionConfig {
+    /// Create an execution config with default settings
+    pub fn new() -> Self {
+        Self { max_concurrency: 2 }

Review comment:
       I think the canonical choice for default concurrency is "number of CPU cores on the machine", which is the approach that Rayon takes and I think you can use this Rust library: https://docs.rs/num_cpus/1.13.0/num_cpus/

##########
File path: rust/datafusion/src/execution/physical_plan/merge.rs
##########
@@ -64,33 +75,51 @@ struct MergePartition {
     schema: SchemaRef,
     /// Input partitions
     partitions: Vec<Arc<dyn Partition>>,
+    /// Maximum number of concurrent threads
+    max_concurrency: usize,
+}
+
+fn collect_from_thread(

Review comment:
       👍  Nice refactor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org