You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "Dandandan (via GitHub)" <gi...@apache.org> on 2023/03/08 19:12:08 UTC

[GitHub] [arrow-datafusion] Dandandan commented on a diff in pull request #5490: Memory limited hash join

Dandandan commented on code in PR #5490:
URL: https://github.com/apache/arrow-datafusion/pull/5490#discussion_r1129916540


##########
datafusion/core/src/physical_plan/joins/hash_join.rs:
##########
@@ -493,90 +491,73 @@ impl ExecutionPlan for HashJoinExec {
 }
 
 async fn collect_left_input(
+    partition: Option<usize>,
     random_state: RandomState,
     left: Arc<dyn ExecutionPlan>,
     on_left: Vec<Column>,
     context: Arc<TaskContext>,
+    metrics: BuildProbeJoinMetrics,
+    reservation: SharedMemoryReservation,
 ) -> Result<JoinLeftData> {
     let schema = left.schema();
-    let start = Instant::now();
-    // merge all left parts into a single stream
-    let merge = {
-        if left.output_partitioning().partition_count() != 1 {
-            Arc::new(CoalescePartitionsExec::new(left))
-        } else {
-            left
-        }
-    };
-    let stream = merge.execute(0, context)?;
-
-    // This operation performs 2 steps at once:
-    // 1. creates a [JoinHashMap] of all batches from the stream
-    // 2. stores the batches in a vector.
-    let initial = (0, Vec::new());
-    let (num_rows, batches) = stream
-        .try_fold(initial, |mut acc, batch| async {
-            acc.0 += batch.num_rows();
-            acc.1.push(batch);
-            Ok(acc)
-        })
-        .await?;
 
-    let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
-    let mut hashes_buffer = Vec::new();
-    let mut offset = 0;
-    for batch in batches.iter() {
-        hashes_buffer.clear();
-        hashes_buffer.resize(batch.num_rows(), 0);
-        update_hash(
-            &on_left,
-            batch,
-            &mut hashmap,
-            offset,
-            &random_state,
-            &mut hashes_buffer,
-        )?;
-        offset += batch.num_rows();
-    }
-    // Merge all batches into a single batch, so we
-    // can directly index into the arrays
-    let single_batch = concat_batches(&schema, &batches, num_rows)?;
-
-    debug!(
-        "Built build-side of hash join containing {} rows in {} ms",
-        num_rows,
-        start.elapsed().as_millis()
-    );
-
-    Ok((hashmap, single_batch))
-}
-
-async fn partitioned_left_input(
-    partition: usize,
-    random_state: RandomState,
-    left: Arc<dyn ExecutionPlan>,
-    on_left: Vec<Column>,
-    context: Arc<TaskContext>,
-) -> Result<JoinLeftData> {
-    let schema = left.schema();
+    let (left_input, left_input_partition) = if let Some(partition) = partition {
+        (left, partition)
+    } else {
+        let merge = {
+            if left.output_partitioning().partition_count() != 1 {
+                Arc::new(CoalescePartitionsExec::new(left))
+            } else {
+                left
+            }
+        };
 
-    let start = Instant::now();
+        (merge, 0)
+    };
 
-    // Load 1 partition of left side in memory
-    let stream = left.execute(partition, context.clone())?;
+    // Depending on partition argument load single partition or whole left side in memory
+    let stream = left_input.execute(left_input_partition, context.clone())?;
 
     // This operation performs 2 steps at once:
     // 1. creates a [JoinHashMap] of all batches from the stream
     // 2. stores the batches in a vector.
-    let initial = (0, Vec::new());
-    let (num_rows, batches) = stream
+    let initial = (Vec::new(), 0, metrics, reservation);
+    let (batches, num_rows, metrics, reservation) = stream
         .try_fold(initial, |mut acc, batch| async {
-            acc.0 += batch.num_rows();
-            acc.1.push(batch);
+            let batch_size = batch.get_array_memory_size();
+            // Reserve memory for incoming batch
+            acc.3.lock().try_grow(batch_size)?;
+            // Update metrics
+            acc.2.build_mem_used.add(batch_size);
+            acc.2.build_input_batches.add(1);
+            acc.2.build_input_rows.add(batch.num_rows());
+            // Update rowcount
+            acc.1 += batch.num_rows();
+            // Push batch to output
+            acc.0.push(batch);
             Ok(acc)
         })
         .await?;
 
+    // Estimation of memory size, required for hashtable, prior to allocation.
+    // Final result can be verified using `RawTable.allocation_info()`
+    //
+    // For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty.
+    // This formula leads to overallocation for small tables (< 8 elements) but fine overall.
+    let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
+        DataFusionError::Execution(
+            "usize overflow while estimating number of hasmap buckets".to_string(),
+        )
+    })? / 7)
+        .next_power_of_two();
+    // 32 bytes per `(u64, SmallVec<[u64; 1]>)`
+    // + 1 byte for each bucket
+    // + 16 bytes fixed
+    let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;
+
+    reservation.lock().try_grow(estimated_hastable_size)?;
+    metrics.build_mem_used.add(estimated_hastable_size);
+
     let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));

Review Comment:
   Ah I didn't realize the allocation is the same, only guarding against overallocation👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org