You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/21 14:38:41 UTC

[GitHub] [arrow-datafusion] tustvold opened a new pull request #379: Sort preserving merge (#362)

tustvold opened a new pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379


   Closes #362.
   
   Creating as draft as currently builds on top of #377 as it uses a partitioned SortExec as part of its tests.
   
   This PR adds a SortPreservingMergeExec operator that allows merging together multiple sorted partitions into a single partition.
   
   The main implementation is contained within SortPreservingMergeStream and SortKeyCursor:
   
   `SortKeyCursor` provides the ability to compare the sort keys of the next row that could be yielded for each stream, in order to determine which one to yield.
   
   `SortPreservingMergeStream` maintains a list of `SortKeyCursor` for each stream and builds up a list of sorted indices identifying rows within these cursors. When it reads the last row of a RecordBatch, it fetches another from the input. Once it has accumulated target_batch_size` row indexes (or exhausted all input streams) it will combine the relevant rows from the buffered RecordBatches into a single RecordBatch, drop any cursors it no longer needs, and yield the batch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r638103766



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {

Review comment:
       great suggestion @jhorstmann  -- thank you -- I  filed https://github.com/apache/arrow-datafusion/issues/416 so it is more visible




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r637921645



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {
+        // Mapping from stream index to the index of the first buffer from that stream
+        let mut buffer_idx = 0;
+        let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
+
+        for cursors in &self.cursors {
+            stream_to_buffer_idx.push(buffer_idx);
+            buffer_idx += cursors.len();
+        }
+
+        let columns = self
+            .schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(column_idx, field)| {
+                let arrays = self
+                    .cursors
+                    .iter()
+                    .flat_map(|cursor| {
+                        cursor
+                            .iter()
+                            .map(|cursor| cursor.batch.column(column_idx).data())
+                    })
+                    .collect();
+
+                let mut array_data = MutableArrayData::new(
+                    arrays,
+                    field.is_nullable(),
+                    self.in_progress.len(),
+                );
+
+                for row_index in &self.in_progress {
+                    let buffer_idx =
+                        stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx;
+
+                    // TODO: Coalesce contiguous writes
+                    array_data.extend(
+                        buffer_idx,
+                        row_index.row_idx,
+                        row_index.row_idx + 1,
+                    );
+                }
+
+                arrow::array::make_array(array_data.freeze())
+            })
+            .collect();
+
+        self.in_progress.clear();
+
+        // New cursors are only created once the previous cursor for the stream
+        // is finished. This means all remaining rows from all but the last cursor
+        // for each stream have been yielded to the newly created record batch
+        //
+        // Additionally as `in_progress` has been drained, there are no longer
+        // any RowIndex's reliant on the cursor indexes
+        //
+        // We can therefore drop all but the last cursor for each stream
+        for cursors in &mut self.cursors {
+            if cursors.len() > 1 {
+                // Drain all but the last cursor
+                cursors.drain(0..(cursors.len() - 1));
+            }
+        }
+
+        RecordBatch::try_new(self.schema.clone(), columns)
+            .expect("SortPreservingMergeStream: merged record batch was invalid")
+    }
+}
+
+impl Stream for SortPreservingMergeStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.aborted {
+            return Poll::Ready(None);
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..self.cursors.len() {
+            match futures::ready!(self.maybe_poll_stream(cx, i)) {
+                Ok(_) => {}
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+            }
+        }
+
+        loop {
+            let stream_idx = match self.next_stream_idx() {
+                Ok(Some(idx)) => idx,
+                Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
+                Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
+                        e,
+                    )))));
+                }
+            };
+
+            let cursors = &mut self.cursors[stream_idx];
+            let cursor_idx = cursors.len() - 1;
+            let cursor = cursors.back_mut().unwrap();
+            let row_idx = cursor.advance();
+            let cursor_finished = cursor.is_finished();
+
+            self.in_progress.push(RowIndex {
+                stream_idx,
+                cursor_idx,
+                row_idx,
+            });
+
+            if self.in_progress.len() == self.target_batch_size {
+                return Poll::Ready(Some(Ok(self.build_record_batch())));
+            }
+
+            // If removed the last row from the cursor, need to fetch a new record
+            // batch if possible, before looping round again
+            if cursor_finished {
+                match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for SortPreservingMergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::iter::FromIterator;
+
+    use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use crate::assert_batches_eq;
+    use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::csv::CsvExec;
+    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::sort::SortExec;
+    use crate::physical_plan::{collect, common};
+    use crate::test;
+
+    use super::*;
+    use futures::SinkExt;
+    use tokio_stream::StreamExt;
+
+    #[tokio::test]
+    async fn test_merge() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("d"),
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: Default::default(),
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: Default::default(),
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7 | c | 1970-01-01 00:00:00.000000006 |",
+                "| 1 | d | 1970-01-01 00:00:00.000000004 |",
+                "| 9 | d | 1970-01-01 00:00:00.000000005 |",
+                "| 3 | e | 1970-01-01 00:00:00.000000004 |",
+                "| 2 | e | 1970-01-01 00:00:00.000000006 |",
+                "| 3 | g | 1970-01-01 00:00:00.000000002 |",
+                "| 4 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 5 | i | 1970-01-01 00:00:00.000000006 |",
+                "+---+---+-------------------------------+",
+            ],
+            collected.as_slice()
+        );
+    }
+
+    async fn sorted_merge(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let mut result = collect(merge).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    async fn partition_sort(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let sort_exec = Arc::new(SortExec::try_new(sort.clone(), input).unwrap());
+        sorted_merge(sort_exec, sort).await
+    }
+
+    async fn basic_sort(
+        src: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(MergeExec::new(src));
+        let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
+        let mut result = collect(sort_exec).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort() {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                },
+            },
+            PhysicalSortExpr {
+                expr: col("c2"),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let basic = basic_sort(csv.clone(), sort.clone()).await;
+        let partition = partition_sort(csv, sort).await;
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    // Split the provided record batch into multiple batch_size record batches
+    fn split_batch(sorted: &RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
+        let batches = (sorted.num_rows() + batch_size - 1) / batch_size;
+
+        // Split the sorted RecordBatch into multiple
+        (0..batches)
+            .into_iter()
+            .map(|batch_idx| {
+                let columns = (0..sorted.num_columns())
+                    .map(|column_idx| {
+                        let length =
+                            batch_size.min(sorted.num_rows() - batch_idx * batch_size);
+
+                        sorted
+                            .column(column_idx)
+                            .slice(batch_idx * batch_size, length)
+                    })
+                    .collect();
+
+                RecordBatch::try_new(sorted.schema(), columns).unwrap()
+            })
+            .collect()
+    }
+
+    async fn sorted_partitioned_input(
+        sort: Vec<PhysicalSortExpr>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sorted = basic_sort(csv, sort).await;
+        let split = split_batch(&sorted, 10);
+
+        Arc::new(
+            MemoryExec::try_new(
+                &[split.clone(), split.clone(), split],
+                sorted.schema(),
+                None,
+            )
+            .unwrap(),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input() {
+        let sort = vec![
+            // uint8
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: Default::default(),
+            },
+            // int16
+            PhysicalSortExpr {
+                expr: col("c4"),
+                options: Default::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c13"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input = sorted_partitioned_input(sort.clone()).await;
+        let basic = basic_sort(input.clone(), sort.clone()).await;
+        let partition = sorted_merge(input, sort).await;
+
+        assert_eq!(basic.num_rows(), 300);
+        assert_eq!(partition.num_rows(), 300);
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input_output() {
+        let sort = vec![
+            // float64
+            PhysicalSortExpr {
+                expr: col("c12"),
+                options: Default::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c13"),
+                options: Default::default(),
+            },
+        ];
+
+        let input = sorted_partitioned_input(sort.clone()).await;
+        let basic = basic_sort(input.clone(), sort.clone()).await;
+
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 23));
+        let merged = collect(merge).await.unwrap();
+
+        assert_eq!(merged.len(), 14);
+
+        assert_eq!(basic.num_rows(), 300);
+        assert_eq!(merged.iter().map(|x| x.num_rows()).sum::<usize>(), 300);
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition =
+            arrow::util::pretty::pretty_format_batches(merged.as_slice()).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    #[tokio::test]
+    async fn test_nulls() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            None,
+            Some("a"),
+            Some("b"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
+            Some(8),
+            None,
+            Some(6),
+            None,
+            Some(4),
+        ]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            None,
+            Some("b"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
+            Some(8),
+            None,
+            Some(5),
+            None,
+            Some(4),
+        ]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: true,
+                    },
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: false,
+                    },
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 |   | 1970-01-01 00:00:00.000000008 |",
+                "| 1 |   | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | a |                               |",

Review comment:
       The sort key is just `b` and `c` so the lines
   
   ```
   "| 7 | b | 1970-01-01 00:00:00.000000006 |",
   "| 2 | b |                               |",
   ```
   
   test this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-852183554


   I just fixed a merge conflict -- if the tests pass I plan to merge this PR in


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r638098474



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {
+        // Mapping from stream index to the index of the first buffer from that stream
+        let mut buffer_idx = 0;
+        let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
+
+        for cursors in &self.cursors {
+            stream_to_buffer_idx.push(buffer_idx);
+            buffer_idx += cursors.len();
+        }
+
+        let columns = self
+            .schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(column_idx, field)| {
+                let arrays = self
+                    .cursors
+                    .iter()
+                    .flat_map(|cursor| {
+                        cursor
+                            .iter()
+                            .map(|cursor| cursor.batch.column(column_idx).data())
+                    })
+                    .collect();
+
+                let mut array_data = MutableArrayData::new(
+                    arrays,
+                    field.is_nullable(),
+                    self.in_progress.len(),
+                );
+
+                for row_index in &self.in_progress {
+                    let buffer_idx =
+                        stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx;
+
+                    // TODO: Coalesce contiguous writes
+                    array_data.extend(
+                        buffer_idx,
+                        row_index.row_idx,
+                        row_index.row_idx + 1,
+                    );
+                }
+
+                arrow::array::make_array(array_data.freeze())
+            })
+            .collect();
+
+        self.in_progress.clear();
+
+        // New cursors are only created once the previous cursor for the stream
+        // is finished. This means all remaining rows from all but the last cursor
+        // for each stream have been yielded to the newly created record batch
+        //
+        // Additionally as `in_progress` has been drained, there are no longer
+        // any RowIndex's reliant on the cursor indexes
+        //
+        // We can therefore drop all but the last cursor for each stream
+        for cursors in &mut self.cursors {
+            if cursors.len() > 1 {
+                // Drain all but the last cursor
+                cursors.drain(0..(cursors.len() - 1));
+            }
+        }
+
+        RecordBatch::try_new(self.schema.clone(), columns)
+            .expect("SortPreservingMergeStream: merged record batch was invalid")
+    }
+}
+
+impl Stream for SortPreservingMergeStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.aborted {
+            return Poll::Ready(None);
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..self.cursors.len() {
+            match futures::ready!(self.maybe_poll_stream(cx, i)) {
+                Ok(_) => {}
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+            }
+        }
+
+        loop {
+            let stream_idx = match self.next_stream_idx() {
+                Ok(Some(idx)) => idx,
+                Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
+                Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
+                        e,
+                    )))));
+                }
+            };
+
+            let cursors = &mut self.cursors[stream_idx];
+            let cursor_idx = cursors.len() - 1;
+            let cursor = cursors.back_mut().unwrap();
+            let row_idx = cursor.advance();
+            let cursor_finished = cursor.is_finished();
+
+            self.in_progress.push(RowIndex {
+                stream_idx,
+                cursor_idx,
+                row_idx,
+            });
+
+            if self.in_progress.len() == self.target_batch_size {
+                return Poll::Ready(Some(Ok(self.build_record_batch())));
+            }
+
+            // If removed the last row from the cursor, need to fetch a new record
+            // batch if possible, before looping round again
+            if cursor_finished {
+                match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for SortPreservingMergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::iter::FromIterator;
+
+    use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use crate::assert_batches_eq;
+    use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::csv::CsvExec;
+    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::sort::SortExec;
+    use crate::physical_plan::{collect, common};
+    use crate::test;
+
+    use super::*;
+    use futures::SinkExt;
+    use tokio_stream::StreamExt;
+
+    #[tokio::test]
+    async fn test_merge() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("d"),
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: Default::default(),
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: Default::default(),
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7 | c | 1970-01-01 00:00:00.000000006 |",
+                "| 1 | d | 1970-01-01 00:00:00.000000004 |",
+                "| 9 | d | 1970-01-01 00:00:00.000000005 |",
+                "| 3 | e | 1970-01-01 00:00:00.000000004 |",
+                "| 2 | e | 1970-01-01 00:00:00.000000006 |",
+                "| 3 | g | 1970-01-01 00:00:00.000000002 |",
+                "| 4 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 5 | i | 1970-01-01 00:00:00.000000006 |",
+                "+---+---+-------------------------------+",
+            ],
+            collected.as_slice()
+        );
+    }
+
+    async fn sorted_merge(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let mut result = collect(merge).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    async fn partition_sort(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let sort_exec = Arc::new(SortExec::try_new(sort.clone(), input).unwrap());
+        sorted_merge(sort_exec, sort).await
+    }
+
+    async fn basic_sort(
+        src: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(MergeExec::new(src));
+        let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
+        let mut result = collect(sort_exec).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort() {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                },
+            },
+            PhysicalSortExpr {
+                expr: col("c2"),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let basic = basic_sort(csv.clone(), sort.clone()).await;
+        let partition = partition_sort(csv, sort).await;
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    // Split the provided record batch into multiple batch_size record batches

Review comment:
       https://github.com/apache/arrow-rs/issues/343




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-849093913


   This PR appears to need some rebasing / test fixing love: 
   
   https://github.com/apache/arrow-datafusion/pull/379/checks?check_run_id=2674096854
   
   ```
   
   
   ---- physical_plan::sort_preserving_merge::tests::test_partition_sort stdout ----
   thread 'physical_plan::sort_preserving_merge::tests::test_partition_sort' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("SortExec requires a single input partition")', datafusion/src/physical_plan/sort_preserving_merge.rs:627:47
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
   
   failures:
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r636973314



##########
File path: datafusion/src/physical_plan/sort.rs
##########
@@ -99,11 +99,11 @@ impl ExecutionPlan for SortExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
+        self.input.output_partitioning()

Review comment:
       This is the change from #377 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-849522852


   Apologies - I stripped out the merge that fixed the logical conflict :facepalm: 
   
   Pushed a commit that fixes it :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] jhorstmann commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
jhorstmann commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r637948151



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {

Review comment:
       For bigger number of partitions, storing the cursors in a BinaryHeap, sorted by their current item, would be beneficial.
   
   A rust implementation of that approach can be seen in [this blog post and the first comment under it][1]. I have implemented the same approach in java before. I agree with @alamb though to make it work first, and then optimize later.
   
   [1]: https://dev.to/creativcoder/merge-k-sorted-arrays-in-rust-1b2f




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-847038935


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#379](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (551c197) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/db4f098d38993b96ce1134c4bc7bf5c6579509cf?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (db4f098) will **increase** coverage by `0.07%`.
   > The diff coverage is `81.05%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/379/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #379      +/-   ##
   ==========================================
   + Coverage   74.94%   75.01%   +0.07%     
   ==========================================
     Files         146      147       +1     
     Lines       24314    24783     +469     
   ==========================================
   + Hits        18221    18591     +370     
   - Misses       6093     6192      +99     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `82.75% <ø> (ø)` | |
   | [datafusion/src/physical\_plan/sort.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9zb3J0LnJz) | `91.74% <62.50%> (-0.34%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/common.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9jb21tb24ucnM=) | `84.21% <77.77%> (-2.00%)` | :arrow_down: |
   | [...afusion/src/physical\_plan/sort\_preserving\_merge.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9zb3J0X3ByZXNlcnZpbmdfbWVyZ2UucnM=) | `81.79% <81.79%> (ø)` | |
   | [datafusion/src/physical\_plan/merge.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tZXJnZS5ycw==) | `75.00% <100.00%> (+0.71%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/repartition.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9yZXBhcnRpdGlvbi5ycw==) | `82.45% <0.00%> (-1.89%)` | :arrow_down: |
   | [datafusion/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvc2NhbGFyLnJz) | `53.71% <0.00%> (-1.77%)` | :arrow_down: |
   | [datafusion/src/physical\_optimizer/pruning.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfb3B0aW1pemVyL3BydW5pbmcucnM=) | `89.73% <0.00%> (-0.88%)` | :arrow_down: |
   | [datafusion/src/logical\_plan/expr.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvbG9naWNhbF9wbGFuL2V4cHIucnM=) | `84.34% <0.00%> (ø)` | |
   | ... and [3 more](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [db4f098...551c197](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r637107151



##########
File path: datafusion/src/physical_plan/common.rs
##########
@@ -113,3 +118,29 @@ fn build_file_list_recurse(
     }
     Ok(())
 }
+
+/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
+pub(crate) fn spawn_execution(

Review comment:
       this is a nice abstraction (and we can probably use it elsewhere)

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {

Review comment:
       I think this could return `Result<RecordBatch>` and avoid the `expect()` -- it would also save you having to wrap the function call in an `Ok` which seems like a good thing

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {
+        // Mapping from stream index to the index of the first buffer from that stream
+        let mut buffer_idx = 0;
+        let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
+
+        for cursors in &self.cursors {
+            stream_to_buffer_idx.push(buffer_idx);
+            buffer_idx += cursors.len();
+        }
+
+        let columns = self
+            .schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(column_idx, field)| {
+                let arrays = self
+                    .cursors
+                    .iter()
+                    .flat_map(|cursor| {
+                        cursor
+                            .iter()
+                            .map(|cursor| cursor.batch.column(column_idx).data())
+                    })
+                    .collect();
+
+                let mut array_data = MutableArrayData::new(
+                    arrays,
+                    field.is_nullable(),
+                    self.in_progress.len(),
+                );
+
+                for row_index in &self.in_progress {
+                    let buffer_idx =
+                        stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx;
+
+                    // TODO: Coalesce contiguous writes
+                    array_data.extend(
+                        buffer_idx,
+                        row_index.row_idx,
+                        row_index.row_idx + 1,
+                    );
+                }
+
+                arrow::array::make_array(array_data.freeze())
+            })
+            .collect();
+
+        self.in_progress.clear();
+
+        // New cursors are only created once the previous cursor for the stream
+        // is finished. This means all remaining rows from all but the last cursor
+        // for each stream have been yielded to the newly created record batch
+        //
+        // Additionally as `in_progress` has been drained, there are no longer
+        // any RowIndex's reliant on the cursor indexes
+        //
+        // We can therefore drop all but the last cursor for each stream
+        for cursors in &mut self.cursors {
+            if cursors.len() > 1 {
+                // Drain all but the last cursor
+                cursors.drain(0..(cursors.len() - 1));
+            }
+        }
+
+        RecordBatch::try_new(self.schema.clone(), columns)
+            .expect("SortPreservingMergeStream: merged record batch was invalid")
+    }
+}
+
+impl Stream for SortPreservingMergeStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.aborted {
+            return Poll::Ready(None);
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..self.cursors.len() {
+            match futures::ready!(self.maybe_poll_stream(cx, i)) {
+                Ok(_) => {}
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+            }
+        }
+
+        loop {
+            let stream_idx = match self.next_stream_idx() {
+                Ok(Some(idx)) => idx,
+                Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
+                Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
+                        e,
+                    )))));
+                }
+            };
+
+            let cursors = &mut self.cursors[stream_idx];
+            let cursor_idx = cursors.len() - 1;
+            let cursor = cursors.back_mut().unwrap();
+            let row_idx = cursor.advance();
+            let cursor_finished = cursor.is_finished();
+
+            self.in_progress.push(RowIndex {
+                stream_idx,
+                cursor_idx,
+                row_idx,
+            });
+
+            if self.in_progress.len() == self.target_batch_size {
+                return Poll::Ready(Some(Ok(self.build_record_batch())));
+            }
+
+            // If removed the last row from the cursor, need to fetch a new record
+            // batch if possible, before looping round again
+            if cursor_finished {
+                match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for SortPreservingMergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::iter::FromIterator;
+
+    use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use crate::assert_batches_eq;
+    use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::csv::CsvExec;
+    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::sort::SortExec;
+    use crate::physical_plan::{collect, common};
+    use crate::test;
+
+    use super::*;
+    use futures::SinkExt;
+    use tokio_stream::StreamExt;
+
+    #[tokio::test]
+    async fn test_merge() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("d"),
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: Default::default(),
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: Default::default(),
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7 | c | 1970-01-01 00:00:00.000000006 |",
+                "| 1 | d | 1970-01-01 00:00:00.000000004 |",
+                "| 9 | d | 1970-01-01 00:00:00.000000005 |",
+                "| 3 | e | 1970-01-01 00:00:00.000000004 |",
+                "| 2 | e | 1970-01-01 00:00:00.000000006 |",
+                "| 3 | g | 1970-01-01 00:00:00.000000002 |",
+                "| 4 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 5 | i | 1970-01-01 00:00:00.000000006 |",
+                "+---+---+-------------------------------+",
+            ],
+            collected.as_slice()
+        );
+    }
+
+    async fn sorted_merge(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let mut result = collect(merge).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    async fn partition_sort(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let sort_exec = Arc::new(SortExec::try_new(sort.clone(), input).unwrap());
+        sorted_merge(sort_exec, sort).await
+    }
+
+    async fn basic_sort(
+        src: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(MergeExec::new(src));
+        let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
+        let mut result = collect(sort_exec).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort() {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                },
+            },
+            PhysicalSortExpr {
+                expr: col("c2"),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let basic = basic_sort(csv.clone(), sort.clone()).await;
+        let partition = partition_sort(csv, sort).await;
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    // Split the provided record batch into multiple batch_size record batches
+    fn split_batch(sorted: &RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
+        let batches = (sorted.num_rows() + batch_size - 1) / batch_size;
+
+        // Split the sorted RecordBatch into multiple
+        (0..batches)
+            .into_iter()
+            .map(|batch_idx| {
+                let columns = (0..sorted.num_columns())
+                    .map(|column_idx| {
+                        let length =
+                            batch_size.min(sorted.num_rows() - batch_idx * batch_size);
+
+                        sorted
+                            .column(column_idx)
+                            .slice(batch_idx * batch_size, length)
+                    })
+                    .collect();
+
+                RecordBatch::try_new(sorted.schema(), columns).unwrap()
+            })
+            .collect()
+    }
+
+    async fn sorted_partitioned_input(
+        sort: Vec<PhysicalSortExpr>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sorted = basic_sort(csv, sort).await;
+        let split = split_batch(&sorted, 10);
+
+        Arc::new(
+            MemoryExec::try_new(
+                &[split.clone(), split.clone(), split],
+                sorted.schema(),
+                None,
+            )
+            .unwrap(),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input() {
+        let sort = vec![
+            // uint8
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: Default::default(),
+            },
+            // int16
+            PhysicalSortExpr {
+                expr: col("c4"),
+                options: Default::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c13"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input = sorted_partitioned_input(sort.clone()).await;
+        let basic = basic_sort(input.clone(), sort.clone()).await;
+        let partition = sorted_merge(input, sort).await;
+
+        assert_eq!(basic.num_rows(), 300);
+        assert_eq!(partition.num_rows(), 300);
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input_output() {
+        let sort = vec![
+            // float64
+            PhysicalSortExpr {
+                expr: col("c12"),
+                options: Default::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c13"),
+                options: Default::default(),
+            },
+        ];
+
+        let input = sorted_partitioned_input(sort.clone()).await;
+        let basic = basic_sort(input.clone(), sort.clone()).await;
+
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 23));
+        let merged = collect(merge).await.unwrap();
+
+        assert_eq!(merged.len(), 14);
+
+        assert_eq!(basic.num_rows(), 300);
+        assert_eq!(merged.iter().map(|x| x.num_rows()).sum::<usize>(), 300);
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition =
+            arrow::util::pretty::pretty_format_batches(merged.as_slice()).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    #[tokio::test]
+    async fn test_nulls() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            None,
+            Some("a"),
+            Some("b"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
+            Some(8),
+            None,
+            Some(6),
+            None,
+            Some(4),
+        ]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            None,
+            Some("b"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
+            Some(8),
+            None,
+            Some(5),
+            None,
+            Some(4),
+        ]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: true,
+                    },
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: SortOptions {
+                        descending: false,
+                        nulls_first: false,
+                    },
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 |   | 1970-01-01 00:00:00.000000008 |",
+                "| 1 |   | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | a |                               |",

Review comment:
       In order to cover the `nulls_first: false` case for `"c"` I think you need several rows here with a tie for a and b, and both a null and non value for c.  I didn't see any such cases (though I may have missed it)
   
   Perhaps adding a row like the following would be enough
   
   ```
                   "| 7 | b | NULL |",
   ```

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {

Review comment:
       eventually (not as part of this PR) we should add something like `required_child_sort_order` so the operators can report on what sortedness they are assuming.

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {
+        // Mapping from stream index to the index of the first buffer from that stream
+        let mut buffer_idx = 0;
+        let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
+
+        for cursors in &self.cursors {
+            stream_to_buffer_idx.push(buffer_idx);
+            buffer_idx += cursors.len();
+        }
+
+        let columns = self
+            .schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(column_idx, field)| {
+                let arrays = self
+                    .cursors
+                    .iter()
+                    .flat_map(|cursor| {
+                        cursor
+                            .iter()
+                            .map(|cursor| cursor.batch.column(column_idx).data())
+                    })
+                    .collect();
+
+                let mut array_data = MutableArrayData::new(
+                    arrays,
+                    field.is_nullable(),
+                    self.in_progress.len(),
+                );
+
+                for row_index in &self.in_progress {
+                    let buffer_idx =
+                        stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx;
+
+                    // TODO: Coalesce contiguous writes
+                    array_data.extend(
+                        buffer_idx,
+                        row_index.row_idx,
+                        row_index.row_idx + 1,
+                    );
+                }
+
+                arrow::array::make_array(array_data.freeze())
+            })
+            .collect();
+
+        self.in_progress.clear();
+
+        // New cursors are only created once the previous cursor for the stream
+        // is finished. This means all remaining rows from all but the last cursor
+        // for each stream have been yielded to the newly created record batch
+        //
+        // Additionally as `in_progress` has been drained, there are no longer
+        // any RowIndex's reliant on the cursor indexes
+        //
+        // We can therefore drop all but the last cursor for each stream
+        for cursors in &mut self.cursors {
+            if cursors.len() > 1 {
+                // Drain all but the last cursor
+                cursors.drain(0..(cursors.len() - 1));
+            }
+        }
+
+        RecordBatch::try_new(self.schema.clone(), columns)
+            .expect("SortPreservingMergeStream: merged record batch was invalid")
+    }
+}
+
+impl Stream for SortPreservingMergeStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.aborted {
+            return Poll::Ready(None);
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..self.cursors.len() {
+            match futures::ready!(self.maybe_poll_stream(cx, i)) {
+                Ok(_) => {}
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+            }
+        }
+
+        loop {
+            let stream_idx = match self.next_stream_idx() {
+                Ok(Some(idx)) => idx,
+                Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
+                Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
+                        e,
+                    )))));
+                }
+            };
+
+            let cursors = &mut self.cursors[stream_idx];
+            let cursor_idx = cursors.len() - 1;
+            let cursor = cursors.back_mut().unwrap();
+            let row_idx = cursor.advance();
+            let cursor_finished = cursor.is_finished();
+
+            self.in_progress.push(RowIndex {
+                stream_idx,
+                cursor_idx,
+                row_idx,
+            });
+
+            if self.in_progress.len() == self.target_batch_size {
+                return Poll::Ready(Some(Ok(self.build_record_batch())));
+            }
+
+            // If removed the last row from the cursor, need to fetch a new record
+            // batch if possible, before looping round again
+            if cursor_finished {
+                match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for SortPreservingMergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::iter::FromIterator;
+
+    use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use crate::assert_batches_eq;
+    use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::csv::CsvExec;
+    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::sort::SortExec;
+    use crate::physical_plan::{collect, common};
+    use crate::test;
+
+    use super::*;
+    use futures::SinkExt;
+    use tokio_stream::StreamExt;
+
+    #[tokio::test]
+    async fn test_merge() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("d"),
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: Default::default(),
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: Default::default(),
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7 | c | 1970-01-01 00:00:00.000000006 |",
+                "| 1 | d | 1970-01-01 00:00:00.000000004 |",
+                "| 9 | d | 1970-01-01 00:00:00.000000005 |",
+                "| 3 | e | 1970-01-01 00:00:00.000000004 |",
+                "| 2 | e | 1970-01-01 00:00:00.000000006 |",
+                "| 3 | g | 1970-01-01 00:00:00.000000002 |",
+                "| 4 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 5 | i | 1970-01-01 00:00:00.000000006 |",
+                "+---+---+-------------------------------+",
+            ],
+            collected.as_slice()
+        );
+    }
+
+    async fn sorted_merge(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let mut result = collect(merge).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    async fn partition_sort(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let sort_exec = Arc::new(SortExec::try_new(sort.clone(), input).unwrap());
+        sorted_merge(sort_exec, sort).await
+    }
+
+    async fn basic_sort(
+        src: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(MergeExec::new(src));
+        let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
+        let mut result = collect(sort_exec).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort() {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                },
+            },
+            PhysicalSortExpr {
+                expr: col("c2"),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let basic = basic_sort(csv.clone(), sort.clone()).await;
+        let partition = partition_sort(csv, sort).await;
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    // Split the provided record batch into multiple batch_size record batches

Review comment:
       This might be a function that we could add to `RecordBatch` itself? I can file a ticket to do so if you would like

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {
+        // Mapping from stream index to the index of the first buffer from that stream
+        let mut buffer_idx = 0;
+        let mut stream_to_buffer_idx = Vec::with_capacity(self.cursors.len());
+
+        for cursors in &self.cursors {
+            stream_to_buffer_idx.push(buffer_idx);
+            buffer_idx += cursors.len();
+        }
+
+        let columns = self
+            .schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(column_idx, field)| {
+                let arrays = self
+                    .cursors
+                    .iter()
+                    .flat_map(|cursor| {
+                        cursor
+                            .iter()
+                            .map(|cursor| cursor.batch.column(column_idx).data())
+                    })
+                    .collect();
+
+                let mut array_data = MutableArrayData::new(
+                    arrays,
+                    field.is_nullable(),
+                    self.in_progress.len(),
+                );
+
+                for row_index in &self.in_progress {
+                    let buffer_idx =
+                        stream_to_buffer_idx[row_index.stream_idx] + row_index.cursor_idx;
+
+                    // TODO: Coalesce contiguous writes
+                    array_data.extend(
+                        buffer_idx,
+                        row_index.row_idx,
+                        row_index.row_idx + 1,
+                    );
+                }
+
+                arrow::array::make_array(array_data.freeze())
+            })
+            .collect();
+
+        self.in_progress.clear();
+
+        // New cursors are only created once the previous cursor for the stream
+        // is finished. This means all remaining rows from all but the last cursor
+        // for each stream have been yielded to the newly created record batch
+        //
+        // Additionally as `in_progress` has been drained, there are no longer
+        // any RowIndex's reliant on the cursor indexes
+        //
+        // We can therefore drop all but the last cursor for each stream
+        for cursors in &mut self.cursors {
+            if cursors.len() > 1 {
+                // Drain all but the last cursor
+                cursors.drain(0..(cursors.len() - 1));
+            }
+        }
+
+        RecordBatch::try_new(self.schema.clone(), columns)
+            .expect("SortPreservingMergeStream: merged record batch was invalid")
+    }
+}
+
+impl Stream for SortPreservingMergeStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if self.aborted {
+            return Poll::Ready(None);
+        }
+
+        // Ensure all non-exhausted streams have a cursor from which
+        // rows can be pulled
+        for i in 0..self.cursors.len() {
+            match futures::ready!(self.maybe_poll_stream(cx, i)) {
+                Ok(_) => {}
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(e)));
+                }
+            }
+        }
+
+        loop {
+            let stream_idx = match self.next_stream_idx() {
+                Ok(Some(idx)) => idx,
+                Ok(None) if self.in_progress.is_empty() => return Poll::Ready(None),
+                Ok(None) => return Poll::Ready(Some(Ok(self.build_record_batch()))),
+                Err(e) => {
+                    self.aborted = true;
+                    return Poll::Ready(Some(Err(ArrowError::ExternalError(Box::new(
+                        e,
+                    )))));
+                }
+            };
+
+            let cursors = &mut self.cursors[stream_idx];
+            let cursor_idx = cursors.len() - 1;
+            let cursor = cursors.back_mut().unwrap();
+            let row_idx = cursor.advance();
+            let cursor_finished = cursor.is_finished();
+
+            self.in_progress.push(RowIndex {
+                stream_idx,
+                cursor_idx,
+                row_idx,
+            });
+
+            if self.in_progress.len() == self.target_batch_size {
+                return Poll::Ready(Some(Ok(self.build_record_batch())));
+            }
+
+            // If removed the last row from the cursor, need to fetch a new record
+            // batch if possible, before looping round again
+            if cursor_finished {
+                match futures::ready!(self.maybe_poll_stream(cx, stream_idx)) {
+                    Ok(_) => {}
+                    Err(e) => {
+                        self.aborted = true;
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl RecordBatchStream for SortPreservingMergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::iter::FromIterator;
+
+    use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use crate::assert_batches_eq;
+    use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::csv::CsvExec;
+    use crate::physical_plan::expressions::col;
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::sort::SortExec;
+    use crate::physical_plan::{collect, common};
+    use crate::test;
+
+    use super::*;
+    use futures::SinkExt;
+    use tokio_stream::StreamExt;
+
+    #[tokio::test]
+    async fn test_merge() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("e"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 4]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("d"),
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 2, 2, 6]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
+        let schema = b1.schema();
+
+        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(
+            vec![
+                PhysicalSortExpr {
+                    expr: col("b"),
+                    options: Default::default(),
+                },
+                PhysicalSortExpr {
+                    expr: col("c"),
+                    options: Default::default(),
+                },
+            ],
+            Arc::new(exec),
+            1024,
+        ));
+
+        let collected = collect(merge).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        assert_batches_eq!(
+            &[
+                "+---+---+-------------------------------+",
+                "| a | b | c                             |",
+                "+---+---+-------------------------------+",
+                "| 1 | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2 | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7 | c | 1970-01-01 00:00:00.000000006 |",
+                "| 1 | d | 1970-01-01 00:00:00.000000004 |",
+                "| 9 | d | 1970-01-01 00:00:00.000000005 |",
+                "| 3 | e | 1970-01-01 00:00:00.000000004 |",
+                "| 2 | e | 1970-01-01 00:00:00.000000006 |",
+                "| 3 | g | 1970-01-01 00:00:00.000000002 |",
+                "| 4 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 5 | i | 1970-01-01 00:00:00.000000006 |",
+                "+---+---+-------------------------------+",
+            ],
+            collected.as_slice()
+        );
+    }
+
+    async fn sorted_merge(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, input, 1024));
+        let mut result = collect(merge).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    async fn partition_sort(
+        input: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let sort_exec = Arc::new(SortExec::try_new(sort.clone(), input).unwrap());
+        sorted_merge(sort_exec, sort).await
+    }
+
+    async fn basic_sort(
+        src: Arc<dyn ExecutionPlan>,
+        sort: Vec<PhysicalSortExpr>,
+    ) -> RecordBatch {
+        let merge = Arc::new(MergeExec::new(src));
+        let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
+        let mut result = collect(sort_exec).await.unwrap();
+        assert_eq!(result.len(), 1);
+        result.remove(0)
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort() {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions {
+                    descending: true,
+                    nulls_first: true,
+                },
+            },
+            PhysicalSortExpr {
+                expr: col("c2"),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let basic = basic_sort(csv.clone(), sort.clone()).await;
+        let partition = partition_sort(csv, sort).await;
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    // Split the provided record batch into multiple batch_size record batches
+    fn split_batch(sorted: &RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
+        let batches = (sorted.num_rows() + batch_size - 1) / batch_size;
+
+        // Split the sorted RecordBatch into multiple
+        (0..batches)
+            .into_iter()
+            .map(|batch_idx| {
+                let columns = (0..sorted.num_columns())
+                    .map(|column_idx| {
+                        let length =
+                            batch_size.min(sorted.num_rows() - batch_idx * batch_size);
+
+                        sorted
+                            .column(column_idx)
+                            .slice(batch_idx * batch_size, length)
+                    })
+                    .collect();
+
+                RecordBatch::try_new(sorted.schema(), columns).unwrap()
+            })
+            .collect()
+    }
+
+    async fn sorted_partitioned_input(
+        sort: Vec<PhysicalSortExpr>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let schema = test::aggr_test_schema();
+        let partitions = 4;
+        let path =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions).unwrap();
+        let csv = Arc::new(
+            CsvExec::try_new(
+                &path,
+                CsvReadOptions::new().schema(&schema),
+                None,
+                1024,
+                None,
+            )
+            .unwrap(),
+        );
+
+        let sorted = basic_sort(csv, sort).await;
+        let split = split_batch(&sorted, 10);
+
+        Arc::new(
+            MemoryExec::try_new(
+                &[split.clone(), split.clone(), split],
+                sorted.schema(),
+                None,
+            )
+            .unwrap(),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input() {
+        let sort = vec![
+            // uint8
+            PhysicalSortExpr {
+                expr: col("c7"),
+                options: Default::default(),
+            },
+            // int16
+            PhysicalSortExpr {
+                expr: col("c4"),
+                options: Default::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c1"),
+                options: SortOptions::default(),
+            },
+            // utf-8
+            PhysicalSortExpr {
+                expr: col("c13"),
+                options: SortOptions::default(),
+            },
+        ];
+
+        let input = sorted_partitioned_input(sort.clone()).await;
+        let basic = basic_sort(input.clone(), sort.clone()).await;
+        let partition = sorted_merge(input, sort).await;
+
+        assert_eq!(basic.num_rows(), 300);
+        assert_eq!(partition.num_rows(), 300);
+
+        let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap();
+        let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap();
+
+        assert_eq!(basic, partition);
+    }
+
+    #[tokio::test]
+    async fn test_partition_sort_streaming_input_output() {

Review comment:
       I think this test covers the case where each input stream has more than one `RecordBatch`, right (each input partition has three record batches).
   
   Is there any value to another test that has input streams with differing numbers of input batches (I am thinking of an input with 3 partitions: 0 record batches, 1 record batch, and "many" (aka 2 or 3))?

##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal

Review comment:
       I predicate this line will be the bottleneck of this operator.
   
   However, I feel like getting it in and working and then optimizing as a follow on is the correct course of action in this case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
tustvold commented on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-848651262


   Will rebase to remove merges


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] codecov-commenter edited a comment on pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#issuecomment-847038935


   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#379](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7d3dbc5) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/3593d1f358b54bd83cfbafdd18e3664f6f7ab73d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3593d1f) will **increase** coverage by `0.54%`.
   > The diff coverage is `81.62%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow-datafusion/pull/379/graphs/tree.svg?width=650&height=150&src=pr&token=JXwWBKD3D9&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master     #379      +/-   ##
   ==========================================
   + Coverage   74.85%   75.39%   +0.54%     
   ==========================================
     Files         146      148       +2     
     Lines       24565    25242     +677     
   ==========================================
   + Hits        18387    19031     +644     
   - Misses       6178     6211      +33     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/src/physical\_plan/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tb2QucnM=) | `78.70% <ø> (-4.06%)` | :arrow_down: |
   | [datafusion/src/physical\_plan/common.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9jb21tb24ucnM=) | `84.21% <77.77%> (-2.00%)` | :arrow_down: |
   | [...afusion/src/physical\_plan/sort\_preserving\_merge.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9zb3J0X3ByZXNlcnZpbmdfbWVyZ2UucnM=) | `81.66% <81.66%> (ø)` | |
   | [datafusion/src/physical\_plan/merge.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9tZXJnZS5ycw==) | `75.00% <100.00%> (+0.71%)` | :arrow_up: |
   | [datafusion/src/physical\_plan/window\_functions.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi93aW5kb3dfZnVuY3Rpb25zLnJz) | `85.71% <0.00%> (-3.01%)` | :arrow_down: |
   | [datafusion/src/scalar.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvc2NhbGFyLnJz) | `56.19% <0.00%> (-2.13%)` | :arrow_down: |
   | [ballista/rust/client/src/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YmFsbGlzdGEvcnVzdC9jbGllbnQvc3JjL2NvbnRleHQucnM=) | `0.00% <0.00%> (ø)` | |
   | [datafusion/src/physical\_plan/expressions/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9tb2QucnM=) | `71.42% <0.00%> (ø)` | |
   | [...fusion/src/physical\_plan/expressions/row\_number.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcmMvcGh5c2ljYWxfcGxhbi9leHByZXNzaW9ucy9yb3dfbnVtYmVyLnJz) | `81.25% <0.00%> (ø)` | |
   | ... and [17 more](https://codecov.io/gh/apache/arrow-datafusion/pull/379/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3593d1f...7d3dbc5](https://codecov.io/gh/apache/arrow-datafusion/pull/379?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] tustvold commented on a change in pull request #379: Sort preserving merge (#362)

Posted by GitBox <gi...@apache.org>.
tustvold commented on a change in pull request #379:
URL: https://github.com/apache/arrow-datafusion/pull/379#discussion_r637920925



##########
File path: datafusion/src/physical_plan/sort_preserving_merge.rs
##########
@@ -0,0 +1,955 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines the sort preserving merge plan
+
+use std::any::Any;
+use std::cmp::Ordering;
+use std::collections::VecDeque;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::array::{ArrayRef, MutableArrayData};
+use arrow::compute::SortOptions;
+use async_trait::async_trait;
+use futures::channel::mpsc;
+use futures::stream::FusedStream;
+use futures::{Stream, StreamExt};
+
+use crate::arrow::datatypes::SchemaRef;
+use crate::arrow::error::ArrowError;
+use crate::arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::common::spawn_execution;
+use crate::physical_plan::expressions::PhysicalSortExpr;
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr,
+    RecordBatchStream, SendableRecordBatchStream,
+};
+
+/// Sort preserving merge execution plan
+///
+/// This takes an input execution plan and a list of sort expressions, and
+/// provided each partition of the input plan is sorted with respect to
+/// these sort expressions, this operator will yield a single partition
+/// that is also sorted with respect to them
+#[derive(Debug)]
+pub struct SortPreservingMergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Sort expressions
+    expr: Vec<PhysicalSortExpr>,
+    /// The target size of yielded batches
+    target_batch_size: usize,
+}
+
+impl SortPreservingMergeExec {
+    /// Create a new sort execution plan
+    pub fn new(
+        expr: Vec<PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            input,
+            expr,
+            target_batch_size,
+        }
+    }
+
+    /// Input schema
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Sort expressions
+    pub fn expr(&self) -> &[PhysicalSortExpr] {
+        &self.expr
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for SortPreservingMergeExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn required_child_distribution(&self) -> Distribution {
+        Distribution::UnspecifiedDistribution
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(SortPreservingMergeExec::new(
+                self.expr.clone(),
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        if 0 != partition {
+            return Err(DataFusionError::Internal(format!(
+                "SortPreservingMergeExec invalid partition {}",
+                partition
+            )));
+        }
+
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
+            0 => Err(DataFusionError::Internal(
+                "SortPreservingMergeExec requires at least one input partition"
+                    .to_owned(),
+            )),
+            1 => {
+                // bypass if there is only one partition to merge
+                self.input.execute(0).await
+            }
+            _ => {
+                let streams = (0..input_partitions)
+                    .into_iter()
+                    .map(|part_i| {
+                        let (sender, receiver) = mpsc::channel(1);
+                        spawn_execution(self.input.clone(), sender, part_i);
+                        receiver
+                    })
+                    .collect();
+
+                Ok(Box::pin(SortPreservingMergeStream::new(
+                    streams,
+                    self.schema(),
+                    &self.expr,
+                    self.target_batch_size,
+                )))
+            }
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
+                write!(f, "SortPreservingMergeExec: [{}]", expr.join(","))
+            }
+        }
+    }
+}
+
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of `PhysicalExpr` that when
+/// evaluated on the `RecordBatch` yield the sort keys.
+///
+/// Additionally it maintains a row cursor that can be advanced through the rows
+/// of the provided `RecordBatch`
+///
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to by this
+/// row cursor, with that of another `SortKeyCursor`
+#[derive(Debug, Clone)]
+struct SortKeyCursor {
+    columns: Vec<ArrayRef>,
+    batch: RecordBatch,
+    cur_row: usize,
+    num_rows: usize,
+}
+
+impl SortKeyCursor {
+    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> Result<Self> {
+        let columns = sort_key
+            .iter()
+            .map(|expr| Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            cur_row: 0,
+            num_rows: batch.num_rows(),
+            columns,
+            batch,
+        })
+    }
+
+    fn is_finished(&self) -> bool {
+        self.num_rows == self.cur_row
+    }
+
+    fn advance(&mut self) -> usize {
+        assert!(!self.is_finished());
+        let t = self.cur_row;
+        self.cur_row += 1;
+        t
+    }
+
+    /// Compares the sort key pointed to by this instance's row cursor with that of another
+    fn compare(
+        &self,
+        other: &SortKeyCursor,
+        options: &[SortOptions],
+    ) -> Result<Ordering> {
+        if self.columns.len() != other.columns.len() {
+            return Err(DataFusionError::Internal(format!(
+                "SortKeyCursors had inconsistent column counts: {} vs {}",
+                self.columns.len(),
+                other.columns.len()
+            )));
+        }
+
+        if self.columns.len() != options.len() {
+            return Err(DataFusionError::Internal(format!(
+                "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
+                self.columns.len(),
+                options.len()
+            )));
+        }
+
+        let zipped = self
+            .columns
+            .iter()
+            .zip(other.columns.iter())
+            .zip(options.iter());
+
+        for ((l, r), sort_options) in zipped {
+            match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
+                (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
+                (false, true) => return Ok(Ordering::Greater),
+                (true, false) if sort_options.nulls_first => {
+                    return Ok(Ordering::Greater)
+                }
+                (true, false) => return Ok(Ordering::Less),
+                (false, false) => {}
+                (true, true) => {
+                    // TODO: Building the predicate each time is sub-optimal
+                    let c = arrow::array::build_compare(l.as_ref(), r.as_ref())?;
+                    match c(self.cur_row, other.cur_row) {
+                        Ordering::Equal => {}
+                        o if sort_options.descending => return Ok(o.reverse()),
+                        o => return Ok(o),
+                    }
+                }
+            }
+        }
+
+        Ok(Ordering::Equal)
+    }
+}
+
+/// A `RowIndex` identifies a specific row from those buffered
+/// by a `SortPreservingMergeStream`
+#[derive(Debug, Clone)]
+struct RowIndex {
+    /// The index of the stream
+    stream_idx: usize,
+    /// The index of the cursor within the stream's VecDequeue
+    cursor_idx: usize,
+    /// The row index
+    row_idx: usize,
+}
+
+#[derive(Debug)]
+struct SortPreservingMergeStream {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// The sorted input streams to merge together
+    streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+    /// For each input stream maintain a dequeue of SortKeyCursor
+    ///
+    /// Exhausted cursors will be popped off the front once all
+    /// their rows have been yielded to the output
+    cursors: Vec<VecDeque<SortKeyCursor>>,
+    /// The accumulated row indexes for the next record batch
+    in_progress: Vec<RowIndex>,
+    /// The physical expressions to sort by
+    column_expressions: Vec<Arc<dyn PhysicalExpr>>,
+    /// The sort options for each expression
+    sort_options: Vec<SortOptions>,
+    /// The desired RecordBatch size to yield
+    target_batch_size: usize,
+    /// If the stream has encountered an error
+    aborted: bool,
+}
+
+impl SortPreservingMergeStream {
+    fn new(
+        streams: Vec<mpsc::Receiver<ArrowResult<RecordBatch>>>,
+        schema: SchemaRef,
+        expressions: &[PhysicalSortExpr],
+        target_batch_size: usize,
+    ) -> Self {
+        Self {
+            schema,
+            cursors: vec![Default::default(); streams.len()],
+            streams,
+            column_expressions: expressions.iter().map(|x| x.expr.clone()).collect(),
+            sort_options: expressions.iter().map(|x| x.options).collect(),
+            target_batch_size,
+            aborted: false,
+            in_progress: vec![],
+        }
+    }
+
+    /// If the stream at the given index is not exhausted, and the last cursor for the
+    /// stream is finished, poll the stream for the next RecordBatch and create a new
+    /// cursor for the stream from the returned result
+    fn maybe_poll_stream(
+        &mut self,
+        cx: &mut Context<'_>,
+        idx: usize,
+    ) -> Poll<ArrowResult<()>> {
+        if let Some(cursor) = &self.cursors[idx].back() {
+            if !cursor.is_finished() {
+                // Cursor is not finished - don't need a new RecordBatch yet
+                return Poll::Ready(Ok(()));
+            }
+        }
+
+        let stream = &mut self.streams[idx];
+        if stream.is_terminated() {
+            return Poll::Ready(Ok(()));
+        }
+
+        // Fetch a new record and create a cursor from it
+        match futures::ready!(stream.poll_next_unpin(cx)) {
+            None => return Poll::Ready(Ok(())),
+            Some(Err(e)) => {
+                return Poll::Ready(Err(e));
+            }
+            Some(Ok(batch)) => {
+                let cursor = match SortKeyCursor::new(batch, &self.column_expressions) {
+                    Ok(cursor) => cursor,
+                    Err(e) => {
+                        return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
+                    }
+                };
+                self.cursors[idx].push_back(cursor)
+            }
+        }
+
+        Poll::Ready(Ok(()))
+    }
+
+    /// Returns the index of the next stream to pull a row from, or None
+    /// if all cursors for all streams are exhausted
+    fn next_stream_idx(&self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter().enumerate() {
+            if let Some(candidate) = candidate.back() {
+                if candidate.is_finished() {
+                    continue;
+                }
+
+                match min_cursor {
+                    None => min_cursor = Some((idx, candidate)),
+                    Some((_, ref min)) => {
+                        if min.compare(candidate, &self.sort_options)?
+                            == Ordering::Greater
+                        {
+                            min_cursor = Some((idx, candidate))
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(min_cursor.map(|(idx, _)| idx))
+    }
+
+    /// Drains the in_progress row indexes, and builds a new RecordBatch from them
+    ///
+    /// Will then drop any cursors for which all rows have been yielded to the output
+    fn build_record_batch(&mut self) -> RecordBatch {

Review comment:
       Can do, I didn't because propagating errors that should be impossible makes me uncomfortable 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org