You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/21 19:00:39 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

alamb commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855507539


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));

Review Comment:
   `ready!` effectvely calls `return Poll::Pending` if the `left_fut` does the same, right?



##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -102,6 +106,37 @@ impl CrossJoinExec {
     }
 }
 
+/// Asynchronously collect the result of the left child
+async fn load(

Review Comment:
   Maybe call it
   ```suggestion
   async fn load_left_input(
   ```
   
   to make it more explicit what is going on?



##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));
+                self.left_result.insert(result)
+            }
+        };
+
+        let left_data = match left_result.as_ref() {
+            Ok(left_data) => left_data,
+            Err(e) => {
+                return Poll::Ready(Some(Err(ArrowError::ExternalError(
+                    e.to_string().into(),
+                ))))
+            }
+        };
+
+        if left_data.num_rows() == 0 {

Review Comment:
   I am not sure this is correct in the context of outer joins -- even if the left data has no rows, the stream may still produce output...
   
   However, I see the original code did the same, so 🤷 
   
   ```
          if left_data.num_rows() == 0 {
               return Ok(Box::pin(MemoryStream::try_new(
                   vec![],
                   self.schema.clone(),
                   None,
               )?));
           }
   ```
   
   (it probably only matters for joins that don't have an equality predicate)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org