You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/01/21 12:44:45 UTC
[arrow] branch master updated: ARROW-11323: [Rust][DataFusion]
Allow sort queries to return no results
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 84126d5 ARROW-11323: [Rust][DataFusion] Allow sort queries to return no results
84126d5 is described below
commit 84126d5d70e9f05f82616e9a8506b53fe1df4a22
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jan 21 07:43:41 2021 -0500
ARROW-11323: [Rust][DataFusion] Allow sort queries to return no results
Prior to this PR, if a plan had an ORDER BY (Sort) that got no input rows, you would get an output error.
Now the test passes and produces the (expected) no output rows
Closes #9275 from alamb/alamb/ARROW-11323-empty-results
Authored-by: Andrew Lamb <an...@nerdnetworks.org>
Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
rust/datafusion/src/execution/context.rs | 31 +++++++++++++++++++++++++++++++
rust/datafusion/src/physical_plan/sort.rs | 18 +++++++++++-------
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index b4b715b..5a03693 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -854,6 +854,19 @@ mod tests {
}
#[tokio::test]
+ async fn sort_empty() -> Result<()> {
+ // The predicate on this query purposely generates no results
+ let results = execute(
+ "SELECT c1, c2 FROM test WHERE c1 > 100000 ORDER BY c1 DESC, c2 ASC",
+ 4,
+ )
+ .await
+ .unwrap();
+ assert_eq!(results.len(), 0);
+ Ok(())
+ }
+
+ #[tokio::test]
async fn aggregate() -> Result<()> {
let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
assert_eq!(results.len(), 1);
@@ -871,6 +884,24 @@ mod tests {
}
#[tokio::test]
+ async fn aggregate_empty() -> Result<()> {
+ // The predicate on this query purposely generates no results
+ let results = execute("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
+ .await
+ .unwrap();
+
+ assert_eq!(results.len(), 1);
+ let batch = &results[0];
+
+ let expected: Vec<&str> = vec!["NULL,NULL"];
+ let mut rows = test::format_batch(&batch);
+ rows.sort();
+ assert_eq!(rows, expected);
+
+ Ok(())
+ }
+
+ #[tokio::test]
async fn aggregate_avg() -> Result<()> {
let results = execute("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
assert_eq!(results.len(), 1);
diff --git a/rust/datafusion/src/physical_plan/sort.rs b/rust/datafusion/src/physical_plan/sort.rs
index 8565540..03e9d6a 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -141,7 +141,10 @@ fn sort_batches(
batches: &Vec<RecordBatch>,
schema: &SchemaRef,
expr: &[PhysicalSortExpr],
-) -> ArrowResult<RecordBatch> {
+) -> ArrowResult<Option<RecordBatch>> {
+ if batches.is_empty() {
+ return Ok(None);
+ }
// combine all record batches into one for each column
let combined_batch = RecordBatch::try_new(
schema.clone(),
@@ -170,7 +173,7 @@ fn sort_batches(
)?;
// reorder all rows based on sorted indices
- RecordBatch::try_new(
+ let sorted_batch = RecordBatch::try_new(
schema.clone(),
combined_batch
.columns()
@@ -187,13 +190,14 @@ fn sort_batches(
)
})
.collect::<ArrowResult<Vec<ArrayRef>>>()?,
- )
+ );
+ sorted_batch.map(Some)
}
pin_project! {
struct SortStream {
#[pin]
- output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+ output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
}
@@ -240,10 +244,10 @@ impl Stream for SortStream {
// check for error in receiving channel and unwrap actual result
let result = match result {
- Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
- Ok(result) => result,
+ Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
+ Ok(result) => result.transpose(),
};
- Poll::Ready(Some(result))
+ Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}