You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/21 17:27:05 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request, #2310: Fix CrossJoinExec evaluating during plan

tustvold opened a new pull request, #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310

   # Which issue does this PR close?
   
   Closes #2306
   
    # Rationale for this change
   
   See ticket
   
   # What changes are included in this PR?
   
   This modifies CrossJoinExec so that it computes the left side data as part of stream execution, and not part of `ExecutionPlan::execute`
   
   # Are there any user-facing changes?
   
   `CrossJoinExec` no longer evaluates during plan
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855526070


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));

Review Comment:
   Yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove merged pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855424041


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +358,44 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {

Review Comment:
   This is kind of arcane, hopefully the future changes to ExecutionPlan as part of #2199 will make writing these sorts of pipelines easier



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855425254


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -102,6 +106,37 @@ impl CrossJoinExec {
     }
 }
 
+/// Asynchronously collect the result of the left child
+async fn load(

Review Comment:
   This is the code that used to be evaluated in `ExecutionPlan::execute`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855507539


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));

Review Comment:
   `ready!` effectvely calls `return Poll::Pending` if the `left_fut` does the same, right?



##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -102,6 +106,37 @@ impl CrossJoinExec {
     }
 }
 
+/// Asynchronously collect the result of the left child
+async fn load(

Review Comment:
   Maybe call it
   ```suggestion
   async fn load_left_input(
   ```
   
   to make it more explicit what is going on?



##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));
+                self.left_result.insert(result)
+            }
+        };
+
+        let left_data = match left_result.as_ref() {
+            Ok(left_data) => left_data,
+            Err(e) => {
+                return Poll::Ready(Some(Err(ArrowError::ExternalError(
+                    e.to_string().into(),
+                ))))
+            }
+        };
+
+        if left_data.num_rows() == 0 {

Review Comment:
   I am not sure this is correct in the context of outer joins -- even if the left data has no rows, the stream may still produce output...
   
   However, I see the original code did the same, so 🤷 
   
   ```
          if left_data.num_rows() == 0 {
               return Ok(Box::pin(MemoryStream::try_new(
                   vec![],
                   self.schema.clone(),
                   None,
               )?));
           }
   ```
   
   (it probably only matters for joins that don't have an equality predicate)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855424376


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +358,44 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));
+                self.left_result.insert(result)
+            }
+        };
+
+        let left_data = match left_result.as_ref() {
+            Ok(left_data) => left_data,
+            Err(e) => {
+                return Poll::Ready(Some(Err(ArrowError::ExternalError(
+                    e.to_string().into(),

Review Comment:
   This is a bit of a hack as DatafusionError isn't clone-able



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855433359


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));

Review Comment:
   Futures-rs does have a MaybeDone construct, but this seemed simpler to understand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855424789


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -143,55 +178,23 @@ impl ExecutionPlan for CrossJoinExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        // we only want to compute the build side once
-        let left_data = {
-            let mut build_side = self.build_side.lock().await;
-
-            match build_side.as_ref() {
-                Some(stream) => stream.clone(),
-                None => {
-                    let start = Instant::now();
-
-                    // merge all left parts into a single stream
-                    let merge = CoalescePartitionsExec::new(self.left.clone());
-                    let stream = merge.execute(0, context.clone()).await?;

Review Comment:
   This is actually a bug in the old implementation, if part of evaluation errored - it would try it again for the next partition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2310: Fix CrossJoinExec evaluating during plan

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #2310:
URL: https://github.com/apache/arrow-datafusion/pull/2310#discussion_r855571115


##########
datafusion/core/src/physical_plan/cross_join.rs:
##########
@@ -352,14 +364,46 @@ impl Stream for CrossJoinStream {
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        if self.left_index > 0 && self.left_index < self.left_data.num_rows() {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl CrossJoinStream {
+    /// Separate implementation function that unpins the [`CrossJoinStream`] so
+    /// that partial borrows work correctly
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
+        let left_result = match &self.left_result {
+            Some(data) => data,
+            None => {
+                let result = ready!(self.left_fut.poll_unpin(cx));
+                self.left_result.insert(result)
+            }
+        };
+
+        let left_data = match left_result.as_ref() {
+            Ok(left_data) => left_data,
+            Err(e) => {
+                return Poll::Ready(Some(Err(ArrowError::ExternalError(
+                    e.to_string().into(),
+                ))))
+            }
+        };
+
+        if left_data.num_rows() == 0 {

Review Comment:
   Yeah, I just blindly reproduced the existing behaviour - I presumed this special case was there for a reason



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org