You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/22 11:47:54 UTC

[arrow] branch master updated: ARROW-10703: [Rust] [DataFusion] Compute build-side of hash join once

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 53a36f5  ARROW-10703: [Rust] [DataFusion] Compute build-side of hash join once
53a36f5 is described below

commit 53a36f5435b88f96906b30447d813dd3ed02dd9a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Dec 22 06:46:49 2020 -0500

    ARROW-10703: [Rust] [DataFusion] Compute build-side of hash join once
    
    This simply introduces a mutex so that the left-side is computed once and then re-used.
    
    I partitioned the TPC-H SF=1 data set so that the order table had 2 partitions and the lineitem had 8 partitions. I confirmed that the build-side only got built once by adding debug logging.
    
    I ran query 12 with master and this PR.
    
    Benchmark results from master branch:
    
    ```
    Query 12 iteration 0 took 856.2 ms
    Query 12 iteration 1 took 577.5 ms
    Query 12 iteration 2 took 579.7 ms
    Query 12 iteration 3 took 562.9 ms
    Query 12 iteration 4 took 590.9 ms
    Query 12 avg time: 633.44 ms
    ```
    
    Benchmark results from this PR:
    
    ```
    Query 12 iteration 0 took 307.6 ms
    Query 12 iteration 1 took 296.8 ms
    Query 12 iteration 2 took 307.4 ms
    Query 12 iteration 3 took 304.5 ms
    Query 12 iteration 4 took 318.5 ms
    Query 12 avg time: 306.95 ms
    ```
    
    Performance is very close to 2x and that was the expected outcome since the build-side got built once instead of twice.
    
    Closes #8981 from andygrove/join-compute-build-side-once
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/datafusion/Cargo.toml                     |  2 +-
 rust/datafusion/src/physical_plan/hash_join.rs | 83 +++++++++++++++-----------
 2 files changed, 50 insertions(+), 35 deletions(-)

diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 9ca3610..3108f0c 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -59,7 +59,7 @@ chrono = "0.4"
 async-trait = "0.1.41"
 futures = "0.3"
 pin-project-lite= "^0.2.0"
-tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
+tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded", "sync"] }
 
 [dev-dependencies]
 rand = "0.7"
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs
index 0724d37..1af388f 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -25,6 +25,7 @@ use std::{any::Any, collections::HashSet};
 use async_trait::async_trait;
 use futures::{Stream, StreamExt, TryStreamExt};
 use hashbrown::HashMap;
+use tokio::sync::Mutex;
 
 use arrow::array::{make_array, Array, MutableArrayData};
 use arrow::datatypes::DataType;
@@ -60,7 +61,7 @@ type RightIndex = Option<usize>;
 // E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
 // for rows 3 and 8 from batch 0 and row 6 from batch 1.
 type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = (JoinHashMap, Vec<RecordBatch>);
+type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;
 
 /// join execution plan executes partitions in parallel and combines them into a set of
 /// partitions.
@@ -76,6 +77,8 @@ pub struct HashJoinExec {
     join_type: JoinType,
     /// The schema once the join is applied
     schema: SchemaRef,
+    /// Build-side
+    build_side: Arc<Mutex<Option<JoinLeftData>>>,
 }
 
 impl HashJoinExec {
@@ -110,6 +113,7 @@ impl HashJoinExec {
             on,
             join_type: *join_type,
             schema,
+            build_side: Arc::new(Mutex::new(None)),
         })
     }
 }
@@ -150,48 +154,59 @@ impl ExecutionPlan for HashJoinExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        // merge all parts into a single stream
-        // this is currently expensive as we re-compute this for every part from the right
-        // TODO: Fix this issue: we can't share this state across parts on the right.
-        // We need to change this `execute` to allow sharing state across parts...
-        let merge = MergeExec::new(self.left.clone());
-        let stream = merge.execute(0).await?;
-
-        let on_left = self
-            .on
-            .iter()
-            .map(|on| on.0.clone())
-            .collect::<HashSet<_>>();
-        let on_right = self
-            .on
-            .iter()
-            .map(|on| on.1.clone())
-            .collect::<HashSet<_>>();
+        // we only want to compute the build side once
+        let left_data = {
+            let mut build_side = self.build_side.lock().await;
+            match build_side.as_ref() {
+                Some(stream) => stream.clone(),
+                None => {
+                    // merge all left parts into a single stream
+                    let merge = MergeExec::new(self.left.clone());
+                    let stream = merge.execute(0).await?;
+
+                    let on_left = self
+                        .on
+                        .iter()
+                        .map(|on| on.0.clone())
+                        .collect::<HashSet<_>>();
+
+                    // This operation performs 2 steps at once:
+                    // 1. creates a [JoinHashMap] of all batches from the stream
+                    // 2. stores the batches in a vector.
+                    let initial = (JoinHashMap::default(), Vec::new(), 0);
+                    let left_data = stream
+                        .try_fold(initial, |mut acc, batch| async {
+                            let hash = &mut acc.0;
+                            let values = &mut acc.1;
+                            let index = acc.2;
+                            update_hash(&on_left, &batch, hash, index).unwrap();
+                            values.push(batch);
+                            acc.2 += 1;
+                            Ok(acc)
+                        })
+                        .await?;
+
+                    let left_side = Arc::new((left_data.0, left_data.1));
+                    *build_side = Some(left_side.clone());
+                    left_side
+                }
+            }
+        };
 
-        // This operation performs 2 steps at once:
-        // 1. creates a [JoinHashMap] of all batches from the stream
-        // 2. stores the batches in a vector.
-        let initial = (JoinHashMap::default(), Vec::new(), 0);
-        let left_data = stream
-            .try_fold(initial, |mut acc, batch| async {
-                let hash = &mut acc.0;
-                let values = &mut acc.1;
-                let index = acc.2;
-                update_hash(&on_left, &batch, hash, index).unwrap();
-                values.push(batch);
-                acc.2 += 1;
-                Ok(acc)
-            })
-            .await?;
         // we have the batches and the hash map with their keys. We can how create a stream
         // over the right that uses this information to issue new batches.
 
         let stream = self.right.execute(partition).await?;
+        let on_right = self
+            .on
+            .iter()
+            .map(|on| on.1.clone())
+            .collect::<HashSet<_>>();
         Ok(Box::pin(HashJoinStream {
             schema: self.schema.clone(),
             on_right,
             join_type: self.join_type,
-            left_data: (left_data.0, left_data.1),
+            left_data,
             right: stream,
         }))
     }