You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/01/21 12:44:45 UTC

[arrow] branch master updated: ARROW-11323: [Rust][DataFusion] Allow sort queries to return no results

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 84126d5  ARROW-11323: [Rust][DataFusion] Allow sort queries to return no results
84126d5 is described below

commit 84126d5d70e9f05f82616e9a8506b53fe1df4a22
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Jan 21 07:43:41 2021 -0500

    ARROW-11323: [Rust][DataFusion] Allow sort queries to return no results
    
    Prior to this PR, if a plan had an ORDER BY (Sort) that got no input rows, you would get an output error.
    
    Now the test passes and produces the (expected) no output rows
    
    Closes #9275 from alamb/alamb/ARROW-11323-empty-results
    
    Authored-by: Andrew Lamb <an...@nerdnetworks.org>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/datafusion/src/execution/context.rs  | 31 +++++++++++++++++++++++++++++++
 rust/datafusion/src/physical_plan/sort.rs | 18 +++++++++++-------
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index b4b715b..5a03693 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -854,6 +854,19 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn sort_empty() -> Result<()> {
+        // The predicate on this query purposely generates no results
+        let results = execute(
+            "SELECT c1, c2 FROM test WHERE c1 > 100000 ORDER BY c1 DESC, c2 ASC",
+            4,
+        )
+        .await
+        .unwrap();
+        assert_eq!(results.len(), 0);
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn aggregate() -> Result<()> {
         let results = execute("SELECT SUM(c1), SUM(c2) FROM test", 4).await?;
         assert_eq!(results.len(), 1);
@@ -871,6 +884,24 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn aggregate_empty() -> Result<()> {
+        // The predicate on this query purposely generates no results
+        let results = execute("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4)
+            .await
+            .unwrap();
+
+        assert_eq!(results.len(), 1);
+        let batch = &results[0];
+
+        let expected: Vec<&str> = vec!["NULL,NULL"];
+        let mut rows = test::format_batch(&batch);
+        rows.sort();
+        assert_eq!(rows, expected);
+
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn aggregate_avg() -> Result<()> {
         let results = execute("SELECT AVG(c1), AVG(c2) FROM test", 4).await?;
         assert_eq!(results.len(), 1);
diff --git a/rust/datafusion/src/physical_plan/sort.rs b/rust/datafusion/src/physical_plan/sort.rs
index 8565540..03e9d6a 100644
--- a/rust/datafusion/src/physical_plan/sort.rs
+++ b/rust/datafusion/src/physical_plan/sort.rs
@@ -141,7 +141,10 @@ fn sort_batches(
     batches: &Vec<RecordBatch>,
     schema: &SchemaRef,
     expr: &[PhysicalSortExpr],
-) -> ArrowResult<RecordBatch> {
+) -> ArrowResult<Option<RecordBatch>> {
+    if batches.is_empty() {
+        return Ok(None);
+    }
     // combine all record batches into one for each column
     let combined_batch = RecordBatch::try_new(
         schema.clone(),
@@ -170,7 +173,7 @@ fn sort_batches(
     )?;
 
     // reorder all rows based on sorted indices
-    RecordBatch::try_new(
+    let sorted_batch = RecordBatch::try_new(
         schema.clone(),
         combined_batch
             .columns()
@@ -187,13 +190,14 @@ fn sort_batches(
                 )
             })
             .collect::<ArrowResult<Vec<ArrayRef>>>()?,
-    )
+    );
+    sorted_batch.map(Some)
 }
 
 pin_project! {
     struct SortStream {
         #[pin]
-        output: futures::channel::oneshot::Receiver<ArrowResult<RecordBatch>>,
+        output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
         finished: bool,
         schema: SchemaRef,
     }
@@ -240,10 +244,10 @@ impl Stream for SortStream {
 
                 // check for error in receiving channel and unwrap actual result
                 let result = match result {
-                    Err(e) => Err(ArrowError::ExternalError(Box::new(e))), // error receiving
-                    Ok(result) => result,
+                    Err(e) => Some(Err(ArrowError::ExternalError(Box::new(e)))), // error receiving
+                    Ok(result) => result.transpose(),
                 };
-                Poll::Ready(Some(result))
+                Poll::Ready(result)
             }
             Poll::Pending => Poll::Pending,
         }