You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/03/18 12:02:36 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5632: feat: Memory limited merge join

alamb commented on code in PR #5632:
URL: https://github.com/apache/arrow-datafusion/pull/5632#discussion_r1141007193


##########
datafusion/core/src/physical_plan/joins/sort_merge_join.rs:
##########
@@ -2212,4 +2258,135 @@ mod tests {
         assert_batches_sorted_eq!(expected, &batches);
         Ok(())
     }
+
+    #[tokio::test]
+    async fn overallocation_single_batch() -> Result<()> {
+        let left = build_table(
+            ("a1", &vec![0, 1, 2, 3, 4, 5]),
+            ("b1", &vec![1, 2, 3, 4, 5, 6]),
+            ("c1", &vec![4, 5, 6, 7, 8, 9]),
+        );
+        let right = build_table(
+            ("a2", &vec![0, 10, 20, 30, 40]),
+            ("b2", &vec![1, 3, 4, 6, 8]),
+            ("c2", &vec![50, 60, 70, 80, 90]),
+        );
+        let on = vec![(
+            Column::new_with_schema("b1", &left.schema())?,
+            Column::new_with_schema("b2", &right.schema())?,
+        )];
+        let sort_options = vec![SortOptions::default(); on.len()];
+
+        let join_types = vec![
+            JoinType::Inner,
+            JoinType::Left,
+            JoinType::Right,
+            JoinType::Full,
+            JoinType::LeftSemi,
+            JoinType::LeftAnti,
+        ];
+
+        for join_type in join_types {
+            let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
+            let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+            let session_config = SessionConfig::default().with_batch_size(50);
+            let session_ctx = SessionContext::with_config_rt(session_config, runtime);
+            let task_ctx = session_ctx.task_ctx();
+            let join = join_with_options(
+                left.clone(),
+                right.clone(),
+                on.clone(),
+                join_type,
+                sort_options.clone(),
+                false,
+            )?;
+
+            let stream = join.execute(0, task_ctx)?;
+            let err = common::collect(stream).await.unwrap_err();
+
+            assert_contains!(
+                err.to_string(),
+                "Resources exhausted: Failed to allocate additional"

Review Comment:
   Thank you for the unit test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org