You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2020/12/22 11:47:54 UTC
[arrow] branch master updated: ARROW-10703: [Rust] [DataFusion]
Compute build-side of hash join once
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 53a36f5 ARROW-10703: [Rust] [DataFusion] Compute build-side of hash join once
53a36f5 is described below
commit 53a36f5435b88f96906b30447d813dd3ed02dd9a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Dec 22 06:46:49 2020 -0500
ARROW-10703: [Rust] [DataFusion] Compute build-side of hash join once
This simply introduces a mutex so that the left-side is computed once and then re-used.
I partitioned the TPC-H SF=1 data set so that the order table had 2 partitions and the lineitem had 8 partitions. I confirmed that the build-side only got built once by adding debug logging.
I ran query 12 with master and this PR.
Benchmark results from master branch:
```
Query 12 iteration 0 took 856.2 ms
Query 12 iteration 1 took 577.5 ms
Query 12 iteration 2 took 579.7 ms
Query 12 iteration 3 took 562.9 ms
Query 12 iteration 4 took 590.9 ms
Query 12 avg time: 633.44 ms
```
Benchmark results from this PR:
```
Query 12 iteration 0 took 307.6 ms
Query 12 iteration 1 took 296.8 ms
Query 12 iteration 2 took 307.4 ms
Query 12 iteration 3 took 304.5 ms
Query 12 iteration 4 took 318.5 ms
Query 12 avg time: 306.95 ms
```
Performance is very close to 2x and that was the expected outcome since the build-side got built once instead of twice.
Closes #8981 from andygrove/join-compute-build-side-once
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
rust/datafusion/Cargo.toml | 2 +-
rust/datafusion/src/physical_plan/hash_join.rs | 83 +++++++++++++++-----------
2 files changed, 50 insertions(+), 35 deletions(-)
diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml
index 9ca3610..3108f0c 100644
--- a/rust/datafusion/Cargo.toml
+++ b/rust/datafusion/Cargo.toml
@@ -59,7 +59,7 @@ chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.0"
-tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
+tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded", "sync"] }
[dev-dependencies]
rand = "0.7"
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs b/rust/datafusion/src/physical_plan/hash_join.rs
index 0724d37..1af388f 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -25,6 +25,7 @@ use std::{any::Any, collections::HashSet};
use async_trait::async_trait;
use futures::{Stream, StreamExt, TryStreamExt};
use hashbrown::HashMap;
+use tokio::sync::Mutex;
use arrow::array::{make_array, Array, MutableArrayData};
use arrow::datatypes::DataType;
@@ -60,7 +61,7 @@ type RightIndex = Option<usize>;
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
// for rows 3 and 8 from batch 0 and row 6 from batch 1.
type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
-type JoinLeftData = (JoinHashMap, Vec<RecordBatch>);
+type JoinLeftData = Arc<(JoinHashMap, Vec<RecordBatch>)>;
/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
@@ -76,6 +77,8 @@ pub struct HashJoinExec {
join_type: JoinType,
/// The schema once the join is applied
schema: SchemaRef,
+ /// Build-side
+ build_side: Arc<Mutex<Option<JoinLeftData>>>,
}
impl HashJoinExec {
@@ -110,6 +113,7 @@ impl HashJoinExec {
on,
join_type: *join_type,
schema,
+ build_side: Arc::new(Mutex::new(None)),
})
}
}
@@ -150,48 +154,59 @@ impl ExecutionPlan for HashJoinExec {
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
- // merge all parts into a single stream
- // this is currently expensive as we re-compute this for every part from the right
- // TODO: Fix this issue: we can't share this state across parts on the right.
- // We need to change this `execute` to allow sharing state across parts...
- let merge = MergeExec::new(self.left.clone());
- let stream = merge.execute(0).await?;
-
- let on_left = self
- .on
- .iter()
- .map(|on| on.0.clone())
- .collect::<HashSet<_>>();
- let on_right = self
- .on
- .iter()
- .map(|on| on.1.clone())
- .collect::<HashSet<_>>();
+ // we only want to compute the build side once
+ let left_data = {
+ let mut build_side = self.build_side.lock().await;
+ match build_side.as_ref() {
+ Some(stream) => stream.clone(),
+ None => {
+ // merge all left parts into a single stream
+ let merge = MergeExec::new(self.left.clone());
+ let stream = merge.execute(0).await?;
+
+ let on_left = self
+ .on
+ .iter()
+ .map(|on| on.0.clone())
+ .collect::<HashSet<_>>();
+
+ // This operation performs 2 steps at once:
+ // 1. creates a [JoinHashMap] of all batches from the stream
+ // 2. stores the batches in a vector.
+ let initial = (JoinHashMap::default(), Vec::new(), 0);
+ let left_data = stream
+ .try_fold(initial, |mut acc, batch| async {
+ let hash = &mut acc.0;
+ let values = &mut acc.1;
+ let index = acc.2;
+ update_hash(&on_left, &batch, hash, index).unwrap();
+ values.push(batch);
+ acc.2 += 1;
+ Ok(acc)
+ })
+ .await?;
+
+ let left_side = Arc::new((left_data.0, left_data.1));
+ *build_side = Some(left_side.clone());
+ left_side
+ }
+ }
+ };
- // This operation performs 2 steps at once:
- // 1. creates a [JoinHashMap] of all batches from the stream
- // 2. stores the batches in a vector.
- let initial = (JoinHashMap::default(), Vec::new(), 0);
- let left_data = stream
- .try_fold(initial, |mut acc, batch| async {
- let hash = &mut acc.0;
- let values = &mut acc.1;
- let index = acc.2;
- update_hash(&on_left, &batch, hash, index).unwrap();
- values.push(batch);
- acc.2 += 1;
- Ok(acc)
- })
- .await?;
// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
let stream = self.right.execute(partition).await?;
+ let on_right = self
+ .on
+ .iter()
+ .map(|on| on.1.clone())
+ .collect::<HashSet<_>>();
Ok(Box::pin(HashJoinStream {
schema: self.schema.clone(),
on_right,
join_type: self.join_type,
- left_data: (left_data.0, left_data.1),
+ left_data,
right: stream,
}))
}