You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/04/17 22:44:45 UTC
[arrow] branch master updated: ARROW-12421: [Rust] [DataFusion] Fix
topkexec failure
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 27c4fa2 ARROW-12421: [Rust] [DataFusion] Fix topkexec failure
27c4fa2 is described below
commit 27c4fa20a7bc24f53c2d849d21855d8784e1f45f
Author: Heres, Daniel <da...@gmail.com>
AuthorDate: Sat Apr 17 16:43:35 2021 -0600
ARROW-12421: [Rust] [DataFusion] Fix topkexec failure
This reproduces the test failure for the `TopKExec` test that @andygrove was seeing in combination with a 24-core machine.
Also FYI @alamb
(Marked as ready for review to trigger CI)
Closes #10077 from Dandandan/top_k_exec_fail
Authored-by: Heres, Daniel <da...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/execution/context.rs | 2 ++
rust/datafusion/tests/user_defined_plan.rs | 50 +++++++++++++-----------------
2 files changed, 23 insertions(+), 29 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index b0f86ec..c83ca4d 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -59,6 +59,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddMergeExec;
+use crate::physical_optimizer::repartition::Repartition;
use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -642,6 +643,7 @@ impl ExecutionConfig {
],
physical_optimizers: vec![
Arc::new(CoalesceBatches::new()),
+ Arc::new(Repartition::new()),
Arc::new(AddMergeExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs
index aae5c59..f9f2443 100644
--- a/rust/datafusion/tests/user_defined_plan.rs
+++ b/rust/datafusion/tests/user_defined_plan.rs
@@ -58,7 +58,7 @@
//! N elements, reducing the total amount of required buffer memory.
//!
-use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
+use futures::{Stream, StreamExt};
use arrow::{
array::{Int64Array, StringArray},
@@ -180,6 +180,7 @@ async fn topk_plan() -> Result<()> {
fn make_topk_context() -> ExecutionContext {
let config = ExecutionConfig::new()
.with_query_planner(Arc::new(TopKQueryPlanner {}))
+ .with_concurrency(48)
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
ExecutionContext::with_config(config)
@@ -388,6 +389,7 @@ impl ExecutionPlan for TopKExec {
input: self.input.execute(partition).await?,
k: self.k,
done: false,
+ state: BTreeMap::new(),
}))
}
}
@@ -400,6 +402,8 @@ struct TopKReader {
k: usize,
/// Have we produced the output yet?
done: bool,
+ /// Output
+ state: BTreeMap<i64, String>,
}
/// Keeps track of the revenue from customer_id and stores if it
@@ -432,7 +436,7 @@ fn accumulate_batch(
input_batch: &RecordBatch,
mut top_values: BTreeMap<i64, String>,
k: &usize,
-) -> Result<BTreeMap<i64, String>> {
+) -> BTreeMap<i64, String> {
let num_rows = input_batch.num_rows();
// Assuming the input columns are
// column[0]: customer_id / UTF8
@@ -457,7 +461,7 @@ fn accumulate_batch(
k,
);
}
- Ok(top_values)
+ top_values
}
impl Stream for TopKReader {
@@ -475,41 +479,29 @@ impl Stream for TopKReader {
// take this as immutable
let k = self.k;
let schema = self.schema();
- let top_values = self
- .input
- .as_mut()
- // Hard coded implementation for sales / customer_id example as BTree
- .try_fold(
- BTreeMap::<i64, String>::new(),
- move |top_values, batch| async move {
- accumulate_batch(&batch, top_values, &k)
- .map_err(DataFusionError::into_arrow_external_error)
- },
- );
-
- let top_values = top_values.map(|top_values| match top_values {
- Ok(top_values) => {
- // make output by walking over the map backwards (so values are descending)
+ let poll = self.input.poll_next_unpin(cx);
+
+ match poll {
+ Poll::Ready(Some(Ok(batch))) => {
+ self.state = accumulate_batch(&batch, self.state.clone(), &k);
+ Poll::Ready(Some(Ok(RecordBatch::new_empty(schema))))
+ }
+ Poll::Ready(None) => {
+ self.done = true;
let (revenue, customer): (Vec<i64>, Vec<&String>) =
- top_values.iter().rev().unzip();
+ self.state.iter().rev().unzip();
let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect();
- Ok(RecordBatch::try_new(
+ Poll::Ready(Some(RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(customer)),
Arc::new(Int64Array::from(revenue)),
],
- )?)
+ )))
}
- Err(e) => Err(e),
- });
- let mut top_values = Box::pin(top_values.into_stream());
-
- top_values.poll_next_unpin(cx).map(|batch| {
- self.done = true;
- batch
- })
+ other => other,
+ }
}
}