You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/04 19:56:51 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2132: Reduce SortExec memory usage by void constructing single huge batch

alamb commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842072399


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined mono batch,
+                    // component record batches won't overflow as well,

Review Comment:
   👍  yeah I agree this approach is no more prone to overflow than the implementation on `master`



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined mono batch,
+                    // component record batches won't overflow as well,
+                    // use u32 here for space efficiency.
+                    batch_idx: i as u32,
+                    row_idx: r as u32,
+                })
+                .collect::<Vec<CompositeIndex>>()
+        })
+        .collect::<Vec<CompositeIndex>>();
+
+    let sort_columns = expr
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| {
+            let columns_i = sort_arrays
+                .iter()
+                .map(|cs| cs[i].as_ref())
+                .collect::<Vec<&dyn Array>>();
+            Ok(SortColumn {
+                values: concat(columns_i.as_slice())?,
+                options: Some(expr.options),
+            })
+        })
+        .collect::<Result<Vec<_>>>()?;
+    let indices = lexsort_to_indices(&sort_columns, None)?;
 
-        pre_sort
-            .map(|batch| sort_batch(batch, schema.clone(), expressions))
-            .transpose()?
-    };
+    Ok(SortedIterator::new(indices, row_indices, batch_size))
+}
 
-    Ok(Box::pin(SizedRecordBatchStream::new(
-        schema,
-        vec![Arc::new(result.unwrap())],
-        tracking_metrics,
-    )))
+struct SortedIterator {
+    pos: usize,
+    indices: UInt32Array,
+    composite: Vec<CompositeIndex>,
+    batch_size: usize,
+    length: usize,
+}

Review Comment:
   ```suggestion
   struct SortedIterator {
       /// Current logical position in the iterator
       pos: usize,
       /// Indexes into the input representing the correctly sorted total output
       indices: UInt32Array,
       /// Map each each logical input index to where it can be found in the sorted input batches
       composite: Vec<CompositeIndex>,
       /// Maximum batch size to produce
       batch_size: usize,
       /// total length of the iterator
       length: usize,
   }
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined mono batch,
+                    // component record batches won't overflow as well,
+                    // use u32 here for space efficiency.
+                    batch_idx: i as u32,
+                    row_idx: r as u32,
+                })
+                .collect::<Vec<CompositeIndex>>()
+        })
+        .collect::<Vec<CompositeIndex>>();
+
+    let sort_columns = expr
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| {
+            let columns_i = sort_arrays
+                .iter()
+                .map(|cs| cs[i].as_ref())
+                .collect::<Vec<&dyn Array>>();
+            Ok(SortColumn {
+                values: concat(columns_i.as_slice())?,
+                options: Some(expr.options),
+            })
+        })
+        .collect::<Result<Vec<_>>>()?;
+    let indices = lexsort_to_indices(&sort_columns, None)?;
 
-        pre_sort
-            .map(|batch| sort_batch(batch, schema.clone(), expressions))
-            .transpose()?
-    };
+    Ok(SortedIterator::new(indices, row_indices, batch_size))

Review Comment:
   One thing I thought of as a way to improve this performance  is to re-order `row_indices` using `indices` so you had a single `Vec<CompositeIndex>`. Maybe that is better for cache locality and it releases some small amount of memory sooner.
   
   
   However, the more interesting thing is that with a single `Vec` you could then potentially combine multiple `CompositeIndex`s together when sequential outputs shared the same input `RecordBatch` and thus potentially reduce the number of calls to `MutableArrayData::extend`



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -105,13 +107,21 @@ impl ExternalSorter {
         }
     }
 
-    async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+    async fn insert_batch(
+        &self,
+        input: RecordBatch,
+        tracking_metrics: &MemTrackingMetrics,
+    ) -> Result<()> {
         if input.num_rows() > 0 {
             let size = batch_byte_size(&input);
             self.try_grow(size).await?;
             self.metrics.mem_used().add(size);
             let mut in_mem_batches = self.in_mem_batches.lock().await;
-            in_mem_batches.push(input);
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            let partial = sort_batch(input, self.schema.clone(), &self.expr)?;

Review Comment:
   It makes sense that sorting a batch in the same thread that produced it (and thus would still be in the cache) improves performance. Nice find @yjshen 
   
   cc @tustvold who has been observing similar things while working on scheduling I/O and CPU decoding



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined mono batch,
+                    // component record batches won't overflow as well,
+                    // use u32 here for space efficiency.
+                    batch_idx: i as u32,
+                    row_idx: r as u32,
+                })
+                .collect::<Vec<CompositeIndex>>()
+        })
+        .collect::<Vec<CompositeIndex>>();
+
+    let sort_columns = expr
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| {
+            let columns_i = sort_arrays
+                .iter()
+                .map(|cs| cs[i].as_ref())
+                .collect::<Vec<&dyn Array>>();
+            Ok(SortColumn {
+                values: concat(columns_i.as_slice())?,
+                options: Some(expr.options),
+            })
+        })
+        .collect::<Result<Vec<_>>>()?;
+    let indices = lexsort_to_indices(&sort_columns, None)?;
 
-        pre_sort
-            .map(|batch| sort_batch(batch, schema.clone(), expressions))
-            .transpose()?
-    };
+    Ok(SortedIterator::new(indices, row_indices, batch_size))
+}
 
-    Ok(Box::pin(SizedRecordBatchStream::new(
-        schema,
-        vec![Arc::new(result.unwrap())],
-        tracking_metrics,
-    )))
+struct SortedIterator {
+    pos: usize,
+    indices: UInt32Array,
+    composite: Vec<CompositeIndex>,
+    batch_size: usize,
+    length: usize,
+}
+
+impl SortedIterator {
+    fn new(
+        indices: UInt32Array,
+        composite: Vec<CompositeIndex>,
+        batch_size: usize,
+    ) -> Self {
+        let length = composite.len();
+        Self {
+            pos: 0,
+            indices,
+            composite,
+            batch_size,
+            length,
+        }
+    }
+
+    fn memory_size(&self) -> usize {
+        std::mem::size_of_val(self)
+            + self.indices.get_array_memory_size()
+            + std::mem::size_of_val(&self.composite[..])
+    }
+}
+
+impl Iterator for SortedIterator {
+    type Item = Vec<CompositeIndex>;
+
+    /// Emit a max of `batch_size` positions each time
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.pos >= self.length {
+            return None;
+        }
+
+        let current_size = min(self.batch_size, self.length - self.pos);
+        let mut result = Vec::with_capacity(current_size);
+        for i in 0..current_size {
+            let p = self.pos + i;
+            let c_index = self.indices.value(p) as usize;
+            result.push(self.composite[c_index])
+        }
+        self.pos += current_size;
+        Some(result)
+    }
+}
+
+/// Stream of sorted record batches
+struct SortedSizedRecordBatchStream {
+    schema: SchemaRef,
+    batches: Vec<RecordBatch>,
+    sorted_iter: SortedIterator,
+    num_cols: usize,
+    metrics: MemTrackingMetrics,
+}
+
+impl SortedSizedRecordBatchStream {
+    /// new
+    pub fn new(
+        schema: SchemaRef,
+        batches: Vec<RecordBatch>,
+        sorted_iter: SortedIterator,
+        metrics: MemTrackingMetrics,
+    ) -> Self {
+        let size = batches.iter().map(batch_byte_size).sum::<usize>()
+            + sorted_iter.memory_size();
+        metrics.init_mem_used(size);
+        let num_cols = batches[0].num_columns();
+        SortedSizedRecordBatchStream {
+            schema,
+            batches,
+            sorted_iter,
+            num_cols,
+            metrics,
+        }
+    }
+}
+
+impl Stream for SortedSizedRecordBatchStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        match self.sorted_iter.next() {
+            None => Poll::Ready(None),
+            Some(combined) => {
+                let mut output = Vec::with_capacity(self.num_cols);
+                for i in 0..self.num_cols {
+                    let arrays = self
+                        .batches
+                        .iter()
+                        .map(|b| b.column(i).data())
+                        .collect::<Vec<_>>();
+                    let mut mutable =

Review Comment:
   this is very clever 👍 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org