You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "korowa (via GitHub)" <gi...@apache.org> on 2023/03/08 19:03:52 UTC

[GitHub] [arrow-datafusion] korowa commented on a diff in pull request #5490: Memory limited hash join

korowa commented on code in PR #5490:
URL: https://github.com/apache/arrow-datafusion/pull/5490#discussion_r1129895159


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -1173,6 +1149,18 @@ impl HashJoinStream {
         };
         build_timer.done();
 
+        // Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
+        // and join_type requires to store it
+        if self.visited_left_side.is_none()

Review Comment:
   It was my first intention, but then I've realized that `Option` doesn't have fallible versions of `get_or_insert`-like methods (i.e.`try_get_or_insert_with` in this case would be just right), and, the only alternative I saw, was storing `visited_left_side` as a `Result<_, DataFusionError> which, I guess, would also be not really clean.



##########
datafusion/core/src/physical_plan/common.rs:
##########
@@ -39,8 +39,14 @@ use std::task::{Context, Poll};
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
+/// [`MemoryReservation`] used across query execution streams
 pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
+/// [`MemoryReservation`] used at query operator level
+/// `Option` wrapper allows to initialize empty reservation in operator constructor,
+/// and set it to actual reservation at stream level.
+pub(crate) type OperatorMemoryReservation = Arc<Mutex<Option<SharedMemoryReservation>>>;

Review Comment:
   Unfortunately I haven't found better solution yet, but I'll check if there is better way to wrap `MemoryReservation` into something allowing attempts of creation (or initialization) reservation from multiple streams.
   
   Another solution, probably, is to allow to allow creation of MemoryReservation without providing context (in `HashJoinExec::new`), and registering it in `MemoryPool` later (in `HashJoinExec.execute`, when we have context available).



##########
datafusion/core/src/physical_plan/joins/cross_join.rs:
##########
@@ -452,10 +464,7 @@ impl CrossJoinStream {
 
                     Some(result)
                 }
-                other => {
-                    self.reservation.lock().free();
-                    other
-                }
+                other => other,

Review Comment:
   True, now it seems more reliable to store reservations along with left-data they are targeted at, and utilize [this feature](https://github.com/apache/arrow-datafusion/blob/deeaa5632ed99a58b91767261570756db736d158/datafusion/execution/src/memory_pool/mod.rs#L159)



##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec {
 }
 
 async fn collect_left_input(
+    partition: Option<usize>,
     random_state: RandomState,
     left: Arc<dyn ExecutionPlan>,
     on_left: Vec<Column>,
     context: Arc<TaskContext>,
+    metrics: BuildProbeJoinMetrics,
+    reservation: SharedMemoryReservation,
 ) -> Result<JoinLeftData> {
     let schema = left.schema();
-    let start = Instant::now();
-    // merge all left parts into a single stream
-    let merge = {
-        if left.output_partitioning().partition_count() != 1 {
-            Arc::new(CoalescePartitionsExec::new(left))
-        } else {
-            left
-        }
-    };
-    let stream = merge.execute(0, context)?;

Review Comment:
   Correct, these two functions shared same logic, except for left-side stream sourcing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org