You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/03 20:47:16 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

jorgecarleitao opened a new pull request #9086:
URL: https://github.com/apache/arrow/pull/9086


   The motivation behind this PR is that Tokio does not really like blocking (e.g. cpu-intensive) operations, because (by definition) it forbids the scheduler from switching to other tasks. Because of this, tokio recommends (throughout its documentation, but most notably [here](https://docs.rs/tokio/1.0.1/tokio/index.html#cpu-bound-tasks-and-blocking-code)) to use `spawn_blocking` or `rayon` to handle blocking tasks, such as IO and CPU-bounded tasks.
   
   This PR is just an experiment / proposal / idea of how we could handle this within tokio. Specifically, it is using `spawn_blocking` to spawn a thread on tokio's "blocking-dedicated" thread pool to handle a blocking operation, thereby avoiding starving the "async-dedicated" thread pool.
   
   I do not expect this code to have much difference in performance, as `filter` is not such a blocking operation compared to e.g. a group by. However, I think that this could address performance issues when we have multiple stages (as one stage currently blocks the whole thread due to how we perform blocking ops inside `async` code).
   
   @andygrove @alamb @Dandandan , I have been looking at DataFusion's code and tokio's documentation, and I hypothesize that this is would be one way to follow tokio's recommendations for our use-case, but I would really like to get your opinions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551054748



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       This is the core idea: wrap the input stream with a blocking thread that runs `batch_filter` (the offending blocking code).

##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       This is the core idea: wrap the input stream with a blocking task that runs `batch_filter` (the offending blocking code).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551054627



##########
File path: rust/datafusion/src/physical_plan/sized_stream.rs
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SizedRecordBatchStream is a stream of [arrow::record_batch::RecordBatch] that knows its schema.
+
+use std::{
+    pin::Pin,
+    task::{Context, Poll},
+};
+
+use arrow::record_batch::RecordBatch;
+use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
+use futures::stream::{Stream, StreamExt};
+
+use super::RecordBatchStream;
+
+/// A schema-aware [Stream] of [`RecordBatch`]es.
+pub struct SizedRecordBatchStream<S: Stream<Item = ArrowResult<RecordBatch>>> {
+    stream: Pin<Box<S>>,
+    schema: SchemaRef,
+}

Review comment:
       This is just a way of encapsulating a stream into something that implements `RecordBatchStream`. Not really relevant for this PR's idea, just simplifies some common code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#issuecomment-754162220


   @jorgecarleitao 
   
   This is really cool, thanks for creating this experiment!
   I am not very deep yet into the Rust way of doing parallelism, the documentation of tokio makes sense to me.
   
   Some ideas:
   
   * In general, I think it is best if the parallelism is on a high level as possible to reduce the amount of overhead related to scheduling / context switching, etc.
   * But in order to utilize parallelism best it should be fine-grained enough.
   * I think there is some balance between total control of large amount of control control over parallelism. I think Spark concurrency via partitions is an example where you can have a larger amount of control over it. It is not always fine-grained enough, e.g. if you have one 1  / a couple of files as input.
   * I think filtering batches is relatively fine-grained, so I am wondering if this a good level for parallelism.
   
   * Tokios default config `max_blocking_threads` is 512, this is I think very large for CPU intensive work (and would have a negative effect on performance) https://docs.rs/tokio/1.0.1/tokio/runtime/struct.Builder.html#method.max_blocking_threads. Maybe if using different "scopes" it makes sense to use a different runtime for CPU-intensive work where you use a different `max_blocking_threads` config?
   * Tokio's documentation seems to hint that Rayon would be a better choice for CPU intensive work?
   * In the `ParquetExec` `thread::spawn` is being used. `task::spawn_blocking` seems a better choice there as it handles errors in a better way and can limit the nr. of threads compared to  thread::spawn` I guess?
   
   * I think just as the statistics @andygrove started to add for `Exec`s it would be good to have something here as well to debug issues and make sure we are not doing things in an inefficient way


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#issuecomment-753678431


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=h1) Report
   > Merging [#9086](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=desc) (c1702e2) into [master](https://codecov.io/gh/apache/arrow/commit/86cf246d161512923253baa8fe62e00de88db73a?el=desc) (86cf246) will **decrease** coverage by `0.00%`.
   > The diff coverage is `64.70%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/9086/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #9086      +/-   ##
   ==========================================
   - Coverage   82.60%   82.60%   -0.01%     
   ==========================================
     Files         204      205       +1     
     Lines       50175    50180       +5     
   ==========================================
   + Hits        41447    41451       +4     
   - Misses       8728     8729       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow/pull/9086/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL21vZC5ycw==) | `86.00% <ø> (ø)` | |
   | [rust/datafusion/src/physical\_plan/sized\_stream.rs](https://codecov.io/gh/apache/arrow/pull/9086/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL3NpemVkX3N0cmVhbS5ycw==) | `50.00% <50.00%> (ø)` | |
   | [rust/datafusion/src/physical\_plan/filter.rs](https://codecov.io/gh/apache/arrow/pull/9086/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2ZpbHRlci5ycw==) | `76.11% <77.77%> (+1.83%)` | :arrow_up: |
   | [...t/datafusion/src/physical\_plan/coalesce\_batches.rs](https://codecov.io/gh/apache/arrow/pull/9086/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9waHlzaWNhbF9wbGFuL2NvYWxlc2NlX2JhdGNoZXMucnM=) | `89.21% <0.00%> (+0.98%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=footer). Last update [86cf246...c1702e2](https://codecov.io/gh/apache/arrow/pull/9086?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r552191823



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       My comment regarding channels was a response to @Dandandan  and @jorgecarleitao 's discussion of potentially getting more parallelism going. My comments regarding threadpool / spawn blocking was in response to this PR in general




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r552191823



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       My comment regarding channels was a response to @Dandandan  and @jorgecarleitao 's discussion of potentially getting more parallelism going. My comments regarding the threadpools was in response to this PR in general




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#issuecomment-789646295


   @jorgecarleitao  I am closing this PR for the time being to clean up the Rust/Arrow PR backlog.  Please let me know if this is a mistake


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r552008306



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       Thanks @jorgecarleitao this seems to make a lot of sense.
   
   @alamb Was your comment a response to @Dandandan or an objection to using `spawn_blocking`? I wasn't sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551512194



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       Does this code run multiple `batch_filter` for different batches at the same time with this code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#issuecomment-778610466


   @jorgecarleitao 
   What is the status of this PR?
   
   As part of trying to clean up the backlog of Rust PRs in this repo, I am going  through seemingly stale PRs and pinging the authors to see if there are any plans to continue the work or conversation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551522877



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       It looks to me it currently does not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551702345



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       no, but this did not happen in master either: a stream does not ask for the next item until the current item is completed. We get no parallelism whatsoever from streams, only concurrency (because they yield). E.g. https://docs.rs/parallel-stream/2.1.2/parallel_stream/#differences-with-rayon
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #9086:
URL: https://github.com/apache/arrow/pull/9086


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#issuecomment-753675326


   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   Could you open an issue for this pull request on JIRA?
   https://issues.apache.org/jira/browse/ARROW
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9086: [Rust] [DataFusion] [Experiment] Blocking threads filter

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9086:
URL: https://github.com/apache/arrow/pull/9086#discussion_r551883985



##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -103,25 +103,23 @@ impl ExecutionPlan for FilterExec {
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(FilterExecStream {
-            schema: self.input.schema().clone(),
-            predicate: self.predicate.clone(),
-            input: self.input.execute(partition).await?,
-        }))
+        let predicate = self.predicate.clone();
+
+        let stream = self.input.execute(partition).await?;
+        let stream = stream.then(move |batch| {
+            let predicate = predicate.clone();
+            async move {
+                // Filtering batches is CPU-bounded and therefore justifies a dedicated thread pool
+                task::spawn_blocking(move || batch_filter(&batch?, &predicate))
+                    .await
+                    .unwrap()
+            }
+        });
+
+        Ok(Box::pin(SizedRecordBatchStream::new(stream, self.schema())))

Review comment:
       I think buffering with a channel is the Rust-y  way to get more parallel execution without unlimited buffering




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org