You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/22 10:24:22 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2317: Fix HashJoin evaluating during plan

tustvold opened a new pull request, #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317

   # Which issue does this PR close?
   
   Closes #2173
   
    # Rationale for this change
   
   Follows on from #2310 and applies the same treatment to HashJoinExec
   
   # What changes are included in this PR?
   
   Extract the common futures shenanigans into a `OnceAsync` and then uses this within both CrossJoinExec and HashJoinExec
   
   # Are there any user-facing changes?
   
   HashJoinExec no longer evaluates during the plan
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2317: Fix HashJoin evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317#discussion_r856067898


##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -294,150 +296,85 @@ impl ExecutionPlan for HashJoinExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>();
-        // we only want to compute the build side once for PartitionMode::CollectLeft
-        let left_data = {
-            match self.mode {
-                PartitionMode::CollectLeft => {
-                    let mut build_side = self.build_side.lock().await;
-
-                    match build_side.as_ref() {
-                        Some(stream) => stream.clone(),
-                        None => {
-                            let start = Instant::now();
-
-                            // merge all left parts into a single stream
-                            let merge = CoalescePartitionsExec::new(self.left.clone());
-                            let stream = merge.execute(0, context.clone()).await?;
-
-                            // This operation performs 2 steps at once:
-                            // 1. creates a [JoinHashMap] of all batches from the stream
-                            // 2. stores the batches in a vector.
-                            let initial = (0, Vec::new());
-                            let (num_rows, batches) = stream
-                                .try_fold(initial, |mut acc, batch| async {
-                                    acc.0 += batch.num_rows();
-                                    acc.1.push(batch);
-                                    Ok(acc)
-                                })
-                                .await?;
-                            let mut hashmap =
-                                JoinHashMap(RawTable::with_capacity(num_rows));
-                            let mut hashes_buffer = Vec::new();
-                            let mut offset = 0;
-                            for batch in batches.iter() {
-                                hashes_buffer.clear();
-                                hashes_buffer.resize(batch.num_rows(), 0);
-                                update_hash(
-                                    &on_left,
-                                    batch,
-                                    &mut hashmap,
-                                    offset,
-                                    &self.random_state,
-                                    &mut hashes_buffer,
-                                )?;
-                                offset += batch.num_rows();
-                            }
-                            // Merge all batches into a single batch, so we
-                            // can directly index into the arrays
-                            let single_batch =
-                                concat_batches(&self.left.schema(), &batches, num_rows)?;
-
-                            let left_side = Arc::new((hashmap, single_batch));
-
-                            *build_side = Some(left_side.clone());
-
-                            debug!(
-                                "Built build-side of hash join containing {} rows in {} ms",
-                                num_rows,
-                                start.elapsed().as_millis()
-                            );
-
-                            left_side
-                        }
-                    }
-                }
-                PartitionMode::Partitioned => {
-                    let start = Instant::now();
-
-                    // Load 1 partition of left side in memory
-                    let stream = self.left.execute(partition, context.clone()).await?;
-
-                    // This operation performs 2 steps at once:
-                    // 1. creates a [JoinHashMap] of all batches from the stream
-                    // 2. stores the batches in a vector.
-                    let initial = (0, Vec::new());
-                    let (num_rows, batches) = stream
-                        .try_fold(initial, |mut acc, batch| async {
-                            acc.0 += batch.num_rows();
-                            acc.1.push(batch);
-                            Ok(acc)
-                        })
-                        .await?;
-                    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
-                    let mut hashes_buffer = Vec::new();
-                    let mut offset = 0;
-                    for batch in batches.iter() {
-                        hashes_buffer.clear();
-                        hashes_buffer.resize(batch.num_rows(), 0);
-                        update_hash(
-                            &on_left,
-                            batch,
-                            &mut hashmap,
-                            offset,
-                            &self.random_state,
-                            &mut hashes_buffer,
-                        )?;
-                        offset += batch.num_rows();
-                    }
-                    // Merge all batches into a single batch, so we
-                    // can directly index into the arrays
-                    let single_batch =
-                        concat_batches(&self.left.schema(), &batches, num_rows)?;
-
-                    let left_side = Arc::new((hashmap, single_batch));
-
-                    debug!(
-                        "Built build-side {} of hash join containing {} rows in {} ms",
-                        partition,
-                        num_rows,
-                        start.elapsed().as_millis()
-                    );
+        let on_right = self.on.iter().map(|on| on.1.clone()).collect::<Vec<_>>();
 
-                    left_side
+        let left_fut = match self.mode {
+            PartitionMode::CollectLeft => self.left_fut.once(|| {
+                collect_left_input(
+                    self.random_state.clone(),
+                    self.left.clone(),
+                    on_left.clone(),
+                    context.clone(),
+                )
+            }),
+            PartitionMode::Partitioned => {
+                let start = Instant::now();
+
+                // Load 1 partition of left side in memory
+                let stream = self.left.execute(partition, context.clone()).await?;
+
+                // This operation performs 2 steps at once:
+                // 1. creates a [JoinHashMap] of all batches from the stream
+                // 2. stores the batches in a vector.
+                let initial = (0, Vec::new());
+                let (num_rows, batches) = stream
+                    .try_fold(initial, |mut acc, batch| async {
+                        acc.0 += batch.num_rows();
+                        acc.1.push(batch);
+                        Ok(acc)
+                    })
+                    .await?;
+
+                let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
+                let mut hashes_buffer = Vec::new();
+                let mut offset = 0;
+                for batch in batches.iter() {
+                    hashes_buffer.clear();
+                    hashes_buffer.resize(batch.num_rows(), 0);
+                    update_hash(
+                        &on_left,
+                        batch,
+                        &mut hashmap,
+                        offset,
+                        &self.random_state,
+                        &mut hashes_buffer,
+                    )?;
+                    offset += batch.num_rows();
                 }
+                // Merge all batches into a single batch, so we
+                // can directly index into the arrays
+                let single_batch =
+                    concat_batches(&self.left.schema(), &batches, num_rows)?;
+
+                debug!(
+                    "Built build-side {} of hash join containing {} rows in {} ms",
+                    partition,
+                    num_rows,
+                    start.elapsed().as_millis()
+                );
+
+                OnceFut::ready(Ok((hashmap, single_batch)))
             }
         };
 
         // we have the batches and the hash map with their keys. We can how create a stream
         // over the right that uses this information to issue new batches.
+        let right_stream = self.right.execute(partition, context).await?;
 
-        let right_stream = self.right.execute(partition, context.clone()).await?;
-        let on_right = self.on.iter().map(|on| on.1.clone()).collect::<Vec<_>>();
-
-        let num_rows = left_data.1.num_rows();

Review Comment:
   This logic is moved into the stream implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2317: Fix HashJoin evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317#discussion_r856068064


##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -294,150 +296,85 @@ impl ExecutionPlan for HashJoinExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>();
-        // we only want to compute the build side once for PartitionMode::CollectLeft
-        let left_data = {
-            match self.mode {
-                PartitionMode::CollectLeft => {
-                    let mut build_side = self.build_side.lock().await;
-
-                    match build_side.as_ref() {
-                        Some(stream) => stream.clone(),
-                        None => {
-                            let start = Instant::now();

Review Comment:
   This logic is moved into collect_left_input



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2317: Fix HashJoin evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317#discussion_r856067490


##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -294,150 +296,85 @@ impl ExecutionPlan for HashJoinExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         let on_left = self.on.iter().map(|on| on.0.clone()).collect::<Vec<_>>();
-        // we only want to compute the build side once for PartitionMode::CollectLeft
-        let left_data = {
-            match self.mode {
-                PartitionMode::CollectLeft => {
-                    let mut build_side = self.build_side.lock().await;
-
-                    match build_side.as_ref() {
-                        Some(stream) => stream.clone(),
-                        None => {
-                            let start = Instant::now();
-
-                            // merge all left parts into a single stream
-                            let merge = CoalescePartitionsExec::new(self.left.clone());
-                            let stream = merge.execute(0, context.clone()).await?;
-
-                            // This operation performs 2 steps at once:
-                            // 1. creates a [JoinHashMap] of all batches from the stream
-                            // 2. stores the batches in a vector.
-                            let initial = (0, Vec::new());
-                            let (num_rows, batches) = stream
-                                .try_fold(initial, |mut acc, batch| async {
-                                    acc.0 += batch.num_rows();
-                                    acc.1.push(batch);
-                                    Ok(acc)
-                                })
-                                .await?;
-                            let mut hashmap =
-                                JoinHashMap(RawTable::with_capacity(num_rows));
-                            let mut hashes_buffer = Vec::new();
-                            let mut offset = 0;
-                            for batch in batches.iter() {
-                                hashes_buffer.clear();
-                                hashes_buffer.resize(batch.num_rows(), 0);
-                                update_hash(
-                                    &on_left,
-                                    batch,
-                                    &mut hashmap,
-                                    offset,
-                                    &self.random_state,
-                                    &mut hashes_buffer,
-                                )?;
-                                offset += batch.num_rows();
-                            }
-                            // Merge all batches into a single batch, so we
-                            // can directly index into the arrays
-                            let single_batch =
-                                concat_batches(&self.left.schema(), &batches, num_rows)?;
-
-                            let left_side = Arc::new((hashmap, single_batch));
-
-                            *build_side = Some(left_side.clone());
-
-                            debug!(
-                                "Built build-side of hash join containing {} rows in {} ms",
-                                num_rows,
-                                start.elapsed().as_millis()
-                            );
-
-                            left_side
-                        }
-                    }
-                }
-                PartitionMode::Partitioned => {
-                    let start = Instant::now();
-
-                    // Load 1 partition of left side in memory
-                    let stream = self.left.execute(partition, context.clone()).await?;
-
-                    // This operation performs 2 steps at once:
-                    // 1. creates a [JoinHashMap] of all batches from the stream
-                    // 2. stores the batches in a vector.
-                    let initial = (0, Vec::new());
-                    let (num_rows, batches) = stream
-                        .try_fold(initial, |mut acc, batch| async {
-                            acc.0 += batch.num_rows();
-                            acc.1.push(batch);
-                            Ok(acc)
-                        })
-                        .await?;
-                    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
-                    let mut hashes_buffer = Vec::new();
-                    let mut offset = 0;
-                    for batch in batches.iter() {
-                        hashes_buffer.clear();
-                        hashes_buffer.resize(batch.num_rows(), 0);
-                        update_hash(
-                            &on_left,
-                            batch,
-                            &mut hashmap,
-                            offset,
-                            &self.random_state,
-                            &mut hashes_buffer,
-                        )?;
-                        offset += batch.num_rows();
-                    }
-                    // Merge all batches into a single batch, so we
-                    // can directly index into the arrays
-                    let single_batch =
-                        concat_batches(&self.left.schema(), &batches, num_rows)?;
-
-                    let left_side = Arc::new((hashmap, single_batch));
-
-                    debug!(
-                        "Built build-side {} of hash join containing {} rows in {} ms",
-                        partition,
-                        num_rows,
-                        start.elapsed().as_millis()
-                    );
+        let on_right = self.on.iter().map(|on| on.1.clone()).collect::<Vec<_>>();

Review Comment:
   I recommend comparing with whitespace ignored - https://github.com/apache/arrow-datafusion/pull/2317/files?w=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2317: Fix HashJoin evaluating during plan

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2317: Fix HashJoin evaluating during plan

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2317:
URL: https://github.com/apache/arrow-datafusion/pull/2317#discussion_r858571316


##########
datafusion/core/src/physical_plan/hash_join.rs:
##########
@@ -979,21 +953,40 @@ fn produce_from_matched(
     RecordBatch::try_new(schema.clone(), columns)
 }
 
-impl Stream for HashJoinStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
+impl HashJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so

Review Comment:
   ```suggestion
       /// Separate implementation function that unpins the [`HashJoinStream`] so
   ```



##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -34,24 +33,19 @@ use super::{
 };
 use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
-use futures::future::{BoxFuture, Shared};
 use std::time::Instant;
 
 use super::{
     coalesce_batches::concat_batches, DisplayFormatType, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream,
 };
 use crate::execution::context::TaskContext;
+use crate::physical_plan::join_utils::{OnceAsync, OnceFut};

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org