You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/05/31 13:00:20 UTC
[arrow-datafusion] branch master updated: Move `SortKeyCursor` and `RowIndex` into modules, add `sort_key_cursor` test (#2645)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new acb245a9c Move `SortKeyCursor` and `RowIndex` into modules, add `sort_key_cursor` test (#2645)
acb245a9c is described below
commit acb245a9c1d4cf30a484a26152ba0489886a8474
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue May 31 09:00:15 2022 -0400
Move `SortKeyCursor` and `RowIndex` into modules, add `sort_key_cursor` test (#2645)
* Move `SortKeyCursor` and `RowIndex` into modules, add `sort_key_cursor` test
* RAT
* more rat
* Update diagram
* Less macro
* test for stability too
* Fix test
---
.../src/physical_plan/sorts/{mod.rs => cursor.rs} | 64 ++----
datafusion/core/src/physical_plan/sorts/index.rs | 60 ++++++
datafusion/core/src/physical_plan/sorts/mod.rs | 202 +-----------------
.../physical_plan/sorts/sort_preserving_merge.rs | 2 +-
datafusion/core/tests/sort_key_cursor.rs | 233 +++++++++++++++++++++
5 files changed, 322 insertions(+), 239 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/cursor.rs
similarity index 87%
copy from datafusion/core/src/physical_plan/sorts/mod.rs
copy to datafusion/core/src/physical_plan/sorts/cursor.rs
index 8d499be3a..ebe4f95e2 100644
--- a/datafusion/core/src/physical_plan/sorts/mod.rs
+++ b/datafusion/core/src/physical_plan/sorts/cursor.rs
@@ -15,11 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-//! Sort functionalities
-
use crate::error;
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{PhysicalExpr, SendableRecordBatchStream};
+use crate::physical_plan::PhysicalExpr;
use arrow::array::{ArrayRef, DynComparator};
use arrow::compute::SortOptions;
use arrow::record_batch::RecordBatch;
@@ -27,12 +25,8 @@ use hashbrown::HashMap;
use parking_lot::RwLock;
use std::borrow::BorrowMut;
use std::cmp::Ordering;
-use std::fmt::{Debug, Formatter};
use std::sync::Arc;
-pub mod sort;
-pub mod sort_preserving_merge;
-
/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort keys.
///
@@ -42,7 +36,7 @@ pub mod sort_preserving_merge;
/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to
/// by this row cursor, with that of another `SortKeyCursor`. A cursor stores
/// a row comparator for each other cursor that it is compared to.
-struct SortKeyCursor {
+pub struct SortKeyCursor {
stream_idx: usize,
sort_columns: Vec<ArrayRef>,
cur_row: usize,
@@ -71,7 +65,8 @@ impl<'a> std::fmt::Debug for SortKeyCursor {
}
impl SortKeyCursor {
- fn new(
+ /// Create a new SortKeyCursor
+ pub fn new(
stream_idx: usize,
batch_id: usize,
batch: &RecordBatch,
@@ -93,11 +88,27 @@ impl SortKeyCursor {
})
}
- fn is_finished(&self) -> bool {
+ #[inline(always)]
+ /// Return the stream index of this cursor
+ pub fn stream_idx(&self) -> usize {
+ self.stream_idx
+ }
+
+ #[inline(always)]
+ /// Return the batch id of this cursor
+ pub fn batch_id(&self) -> usize {
+ self.batch_id
+ }
+
+ #[inline(always)]
+ /// Return true if the stream is finished
+ pub fn is_finished(&self) -> bool {
self.num_rows == self.cur_row
}
- fn advance(&mut self) -> usize {
+ #[inline(always)]
+ /// Returns the cursor's current row, and advances the cursor to the next row
+ pub fn advance(&mut self) -> usize {
assert!(!self.is_finished());
let t = self.cur_row;
self.cur_row += 1;
@@ -105,7 +116,7 @@ impl SortKeyCursor {
}
/// Compares the sort key pointed to by this instance's row cursor with that of another
- fn compare(&self, other: &SortKeyCursor) -> error::Result<Ordering> {
+ pub fn compare(&self, other: &SortKeyCursor) -> error::Result<Ordering> {
if self.sort_columns.len() != other.sort_columns.len() {
return Err(DataFusionError::Internal(format!(
"SortKeyCursors had inconsistent column counts: {} vs {}",
@@ -207,32 +218,3 @@ impl PartialOrd for SortKeyCursor {
other.compare(self).ok()
}
}
-
-/// A `RowIndex` identifies a specific row from those buffered
-/// by a `SortPreservingMergeStream`
-#[derive(Debug, Clone)]
-struct RowIndex {
- /// The index of the stream
- stream_idx: usize,
- /// The index of the batch within the stream's VecDequeue.
- batch_idx: usize,
- /// The row index
- row_idx: usize,
-}
-
-pub(crate) struct SortedStream {
- stream: SendableRecordBatchStream,
- mem_used: usize,
-}
-
-impl Debug for SortedStream {
- fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
- write!(f, "InMemSorterStream")
- }
-}
-
-impl SortedStream {
- pub(crate) fn new(stream: SendableRecordBatchStream, mem_used: usize) -> Self {
- Self { stream, mem_used }
- }
-}
diff --git a/datafusion/core/src/physical_plan/sorts/index.rs b/datafusion/core/src/physical_plan/sorts/index.rs
new file mode 100644
index 000000000..3b45c6d38
--- /dev/null
+++ b/datafusion/core/src/physical_plan/sorts/index.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// A `RowIndex` identifies a specific row in a logical stream.
+///
+/// Each stream is identified by an `stream_idx` and is formed from a
+/// sequence of RecordBatches batches, each of which is identified by
+/// a unique `batch_idx` within that stream.
+///
+/// This is used by `SortPreservingMergeStream` to identify which
+/// the order of the tuples in the final sorted output stream.
+///
+/// ```text
+/// ┌────┐ ┌────┐ ┌────┐ RecordBatch
+/// │ │ │ │ │ │
+/// │ C1 │ │... │ │ CN │◀─────── (batch_idx = 0)
+/// │ │ │ │ │ │
+/// └────┘ └────┘ └────┘
+/// ┌────┐ ┌────┐ ┌────┐ RecordBatch
+/// │ │ │ │ │ │
+/// │ C1 │ │... │ │ CN │◀─────── (batch_idx = 1)
+/// │ │ │ │ │ │
+/// └────┘ └────┘ └────┘
+/// ┌────┐
+/// │ │ ...
+/// │ C1 │
+/// │ │ ┌────┐ RecordBatch
+/// └────┘ │ │
+/// │ CN │◀────── (batch_idx = M-1)
+/// │ │
+/// └────┘
+///
+///"Stream"s each with Stream N has M
+/// a potentially RecordBatches
+///different number of
+/// RecordBatches
+/// ```
+#[derive(Debug, Clone)]
+pub struct RowIndex {
+ /// The index of the stream (uniquely identifies the stream)
+ pub stream_idx: usize,
+ /// The index of the batch within the stream's VecDequeue.
+ pub batch_idx: usize,
+ /// The row index within the batch
+ pub row_idx: usize,
+}
diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/mod.rs
index 8d499be3a..db6ab5c60 100644
--- a/datafusion/core/src/physical_plan/sorts/mod.rs
+++ b/datafusion/core/src/physical_plan/sorts/mod.rs
@@ -17,208 +17,16 @@
//! Sort functionalities
-use crate::error;
-use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{PhysicalExpr, SendableRecordBatchStream};
-use arrow::array::{ArrayRef, DynComparator};
-use arrow::compute::SortOptions;
-use arrow::record_batch::RecordBatch;
-use hashbrown::HashMap;
-use parking_lot::RwLock;
-use std::borrow::BorrowMut;
-use std::cmp::Ordering;
+use crate::physical_plan::SendableRecordBatchStream;
use std::fmt::{Debug, Formatter};
-use std::sync::Arc;
+mod cursor;
+mod index;
pub mod sort;
pub mod sort_preserving_merge;
-/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
-/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort keys.
-///
-/// Additionally it maintains a row cursor that can be advanced through the rows
-/// of the provided `RecordBatch`
-///
-/// `SortKeyCursor::compare` can then be used to compare the sort key pointed to
-/// by this row cursor, with that of another `SortKeyCursor`. A cursor stores
-/// a row comparator for each other cursor that it is compared to.
-struct SortKeyCursor {
- stream_idx: usize,
- sort_columns: Vec<ArrayRef>,
- cur_row: usize,
- num_rows: usize,
-
- // An id uniquely identifying the record batch scanned by this cursor.
- batch_id: usize,
-
- // A collection of comparators that compare rows in this cursor's batch to
- // the cursors in other batches. Other batches are uniquely identified by
- // their batch_idx.
- batch_comparators: RwLock<HashMap<usize, Vec<DynComparator>>>,
- sort_options: Arc<Vec<SortOptions>>,
-}
-
-impl<'a> std::fmt::Debug for SortKeyCursor {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- f.debug_struct("SortKeyCursor")
- .field("sort_columns", &self.sort_columns)
- .field("cur_row", &self.cur_row)
- .field("num_rows", &self.num_rows)
- .field("batch_id", &self.batch_id)
- .field("batch_comparators", &"<FUNC>")
- .finish()
- }
-}
-
-impl SortKeyCursor {
- fn new(
- stream_idx: usize,
- batch_id: usize,
- batch: &RecordBatch,
- sort_key: &[Arc<dyn PhysicalExpr>],
- sort_options: Arc<Vec<SortOptions>>,
- ) -> error::Result<Self> {
- let sort_columns = sort_key
- .iter()
- .map(|expr| Ok(expr.evaluate(batch)?.into_array(batch.num_rows())))
- .collect::<error::Result<_>>()?;
- Ok(Self {
- stream_idx,
- cur_row: 0,
- num_rows: batch.num_rows(),
- sort_columns,
- batch_id,
- batch_comparators: RwLock::new(HashMap::new()),
- sort_options,
- })
- }
-
- fn is_finished(&self) -> bool {
- self.num_rows == self.cur_row
- }
-
- fn advance(&mut self) -> usize {
- assert!(!self.is_finished());
- let t = self.cur_row;
- self.cur_row += 1;
- t
- }
-
- /// Compares the sort key pointed to by this instance's row cursor with that of another
- fn compare(&self, other: &SortKeyCursor) -> error::Result<Ordering> {
- if self.sort_columns.len() != other.sort_columns.len() {
- return Err(DataFusionError::Internal(format!(
- "SortKeyCursors had inconsistent column counts: {} vs {}",
- self.sort_columns.len(),
- other.sort_columns.len()
- )));
- }
-
- if self.sort_columns.len() != self.sort_options.len() {
- return Err(DataFusionError::Internal(format!(
- "Incorrect number of SortOptions provided to SortKeyCursor::compare, expected {} got {}",
- self.sort_columns.len(),
- self.sort_options.len()
- )));
- }
-
- let zipped: Vec<((&ArrayRef, &ArrayRef), &SortOptions)> = self
- .sort_columns
- .iter()
- .zip(other.sort_columns.iter())
- .zip(self.sort_options.iter())
- .collect::<Vec<_>>();
-
- self.init_cmp_if_needed(other, &zipped)?;
- let map = self.batch_comparators.read();
- let cmp = map.get(&other.batch_id).ok_or_else(|| {
- DataFusionError::Execution(format!(
- "Failed to find comparator for {} cmp {}",
- self.batch_id, other.batch_id
- ))
- })?;
-
- for (i, ((l, r), sort_options)) in zipped.iter().enumerate() {
- match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
- (false, true) if sort_options.nulls_first => return Ok(Ordering::Less),
- (false, true) => return Ok(Ordering::Greater),
- (true, false) if sort_options.nulls_first => {
- return Ok(Ordering::Greater)
- }
- (true, false) => return Ok(Ordering::Less),
- (false, false) => {}
- (true, true) => match cmp[i](self.cur_row, other.cur_row) {
- Ordering::Equal => {}
- o if sort_options.descending => return Ok(o.reverse()),
- o => return Ok(o),
- },
- }
- }
-
- // Break ties using stream_idx to ensure a predictable
- // ordering of rows when comparing equal streams.
- Ok(self.stream_idx.cmp(&other.stream_idx))
- }
-
- /// Initialize a collection of comparators for comparing
- /// columnar arrays of this cursor and "other" if needed.
- fn init_cmp_if_needed(
- &self,
- other: &SortKeyCursor,
- zipped: &[((&ArrayRef, &ArrayRef), &SortOptions)],
- ) -> Result<()> {
- let hm = self.batch_comparators.read();
- if !hm.contains_key(&other.batch_id) {
- drop(hm);
- let mut map = self.batch_comparators.write();
- let cmp = map
- .borrow_mut()
- .entry(other.batch_id)
- .or_insert_with(|| Vec::with_capacity(other.sort_columns.len()));
-
- for (i, ((l, r), _)) in zipped.iter().enumerate() {
- if i >= cmp.len() {
- // initialise comparators
- cmp.push(arrow::array::build_compare(l.as_ref(), r.as_ref())?);
- }
- }
- }
- Ok(())
- }
-}
-
-impl Ord for SortKeyCursor {
- /// Needed by min-heap comparison and reverse the order at the same time.
- fn cmp(&self, other: &Self) -> Ordering {
- other.compare(self).unwrap()
- }
-}
-
-impl PartialEq for SortKeyCursor {
- fn eq(&self, other: &Self) -> bool {
- other.compare(self).unwrap() == Ordering::Equal
- }
-}
-
-impl Eq for SortKeyCursor {}
-
-impl PartialOrd for SortKeyCursor {
- fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
- other.compare(self).ok()
- }
-}
-
-/// A `RowIndex` identifies a specific row from those buffered
-/// by a `SortPreservingMergeStream`
-#[derive(Debug, Clone)]
-struct RowIndex {
- /// The index of the stream
- stream_idx: usize,
- /// The index of the batch within the stream's VecDequeue.
- batch_idx: usize,
- /// The row index
- row_idx: usize,
-}
+pub use cursor::SortKeyCursor;
+pub use index::RowIndex;
pub(crate) struct SortedStream {
stream: SendableRecordBatchStream,
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 515300eff..f7ce73834 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -551,7 +551,7 @@ impl SortPreservingMergeStream {
match self.min_heap.pop() {
Some(mut cursor) => {
- let stream_idx = cursor.stream_idx;
+ let stream_idx = cursor.stream_idx();
let batch_idx = self.batches[stream_idx].len() - 1;
let row_idx = cursor.advance();
diff --git a/datafusion/core/tests/sort_key_cursor.rs b/datafusion/core/tests/sort_key_cursor.rs
new file mode 100644
index 000000000..7672ea577
--- /dev/null
+++ b/datafusion/core/tests/sort_key_cursor.rs
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains tests for SortKeyCursor
+
+use std::{cmp::Ordering, sync::Arc};
+
+use arrow::{array::Int64Array, compute::SortOptions, record_batch::RecordBatch};
+use datafusion::physical_plan::sorts::{RowIndex, SortKeyCursor};
+use datafusion_physical_expr::expressions::col;
+
+#[test]
+fn test_single_column() {
+ let batch1 = int64_batch(vec![Some(1), Some(2), Some(5), Some(6)]);
+ let batch2 = int64_batch(vec![Some(3), Some(4), Some(8), Some(9)]);
+
+ let mut cursor1 = CursorBuilder::new(batch1)
+ .with_stream_idx(11)
+ .with_batch_id(0)
+ .build();
+
+ let mut cursor2 = CursorBuilder::new(batch2)
+ .with_stream_idx(22)
+ .with_batch_id(0)
+ .build();
+
+ let expected = vec![
+ "11: (0, 0)",
+ "11: (0, 1)",
+ "22: (0, 0)",
+ "22: (0, 1)",
+ "11: (0, 2)",
+ "11: (0, 3)",
+ "22: (0, 2)",
+ "22: (0, 3)",
+ ];
+
+ assert_indexes(expected, run(&mut cursor1, &mut cursor2));
+}
+
+#[test]
+fn test_stable_compare() {
+ // Validate ties are broken by the lower stream idx to ensure stable sort
+ let batch1 = int64_batch(vec![Some(3), Some(4)]);
+ let batch2 = int64_batch(vec![Some(3)]);
+
+ let cursor1 = CursorBuilder::new(batch1)
+ // higher stream index
+ .with_stream_idx(33)
+ .with_batch_id(0);
+
+ let cursor2 = CursorBuilder::new(batch2)
+ // Lower stream index -- should always be first
+ .with_stream_idx(22)
+ .with_batch_id(0);
+
+ let expected = vec!["22: (0, 0)", "33: (0, 0)", "33: (0, 1)"];
+
+ // Output should be the same, regardless of order
+ assert_indexes(
+ &expected,
+ run(&mut cursor1.clone().build(), &mut cursor2.clone().build()),
+ );
+ assert_indexes(&expected, run(&mut cursor2.build(), &mut cursor1.build()));
+}
+
+/// Runs the two cursors to completion, sorting them, and
+/// returning the sorted order of rows that would have produced
+fn run(cursor1: &mut SortKeyCursor, cursor2: &mut SortKeyCursor) -> Vec<RowIndex> {
+ let mut indexes = vec![];
+ loop {
+ println!(
+ "(cursor1.is_finished(), cursor2.is_finished()): ({}, {})",
+ cursor1.is_finished(),
+ cursor2.is_finished()
+ );
+
+ match (cursor1.is_finished(), cursor2.is_finished()) {
+ (true, true) => return indexes,
+ (true, false) => return drain(cursor2, indexes),
+ (false, true) => return drain(cursor1, indexes),
+ // both cursors have more rows
+ (false, false) => match cursor1.compare(cursor2).unwrap() {
+ Ordering::Less => {
+ indexes.push(advance(cursor1));
+ }
+ Ordering::Equal => {
+ indexes.push(advance(cursor1));
+ indexes.push(advance(cursor2));
+ }
+ Ordering::Greater => {
+ indexes.push(advance(cursor2));
+ }
+ },
+ }
+ }
+}
+
+// Advance the cursor and return the RowIndex created
+fn advance(cursor: &mut SortKeyCursor) -> RowIndex {
+ let row_idx = cursor.advance();
+ RowIndex {
+ stream_idx: cursor.stream_idx(),
+ batch_idx: cursor.batch_id(),
+ row_idx,
+ }
+}
+
+// Drain remaining items in the cursor, appending result to indexes
+fn drain(cursor: &mut SortKeyCursor, mut indexes: Vec<RowIndex>) -> Vec<RowIndex> {
+ while !cursor.is_finished() {
+ indexes.push(advance(cursor))
+ }
+ indexes
+}
+
+/// Return the values as an [`Int64Array`] single record batch, with
+/// column "c1"
+fn int64_batch(values: impl IntoIterator<Item = Option<i64>>) -> RecordBatch {
+ let array: Int64Array = values.into_iter().collect();
+ RecordBatch::try_from_iter(vec![("c1", Arc::new(array) as _)]).unwrap()
+}
+
+/// helper for creating cursors to test
+#[derive(Debug, Clone)]
+struct CursorBuilder {
+ batch: RecordBatch,
+ stream_idx: Option<usize>,
+ batch_id: Option<usize>,
+}
+
+impl CursorBuilder {
+ fn new(batch: RecordBatch) -> Self {
+ Self {
+ batch,
+ stream_idx: None,
+ batch_id: None,
+ }
+ }
+
+ /// Set the stream index
+ fn with_stream_idx(mut self, stream_idx: usize) -> Self {
+ self.stream_idx = Some(stream_idx);
+ self
+ }
+
+ /// Set the stream index
+ fn with_batch_id(mut self, batch_id: usize) -> Self {
+ self.batch_id = Some(batch_id);
+ self
+ }
+
+ fn build(self) -> SortKeyCursor {
+ let Self {
+ batch,
+ stream_idx,
+ batch_id,
+ } = self;
+ let c1 = col("c1", &batch.schema()).unwrap();
+ let sort_key = vec![c1];
+
+ let sort_options = Arc::new(vec![SortOptions::default()]);
+
+ SortKeyCursor::new(
+ stream_idx.expect("stream idx not set"),
+ batch_id.expect("batch id not set"),
+ &batch,
+ &sort_key,
+ sort_options,
+ )
+ .unwrap()
+ }
+}
+
+/// Compares [`RowIndex`]es with a vector of strings, the result of
+/// pretty formatting the [`RowIndex`]es.
+///
+/// Designed so that failure output can be directly copy/pasted
+/// into the test code as expected results.
+fn assert_indexes(
+ expected_indexes: impl IntoIterator<Item = impl AsRef<str>>,
+ indexes: impl IntoIterator<Item = RowIndex>,
+) {
+ let expected_lines: Vec<_> = expected_indexes
+ .into_iter()
+ .map(|s| s.as_ref().to_string())
+ .collect();
+
+ let actual_lines = format_as_strings(indexes);
+
+ assert_eq!(
+ expected_lines, actual_lines,
+ "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ expected_lines, actual_lines
+ );
+}
+
+/// Formats an terator of RowIndexes into strings for comparisons
+///
+/// ```text
+/// stream: (batch, index)
+/// ```
+///
+/// for example,
+/// ```text
+/// 1: (0, 2)
+/// ```
+/// means "Stream 1, batch id 0, row index 2"
+fn format_as_strings(indexes: impl IntoIterator<Item = RowIndex>) -> Vec<String> {
+ indexes
+ .into_iter()
+ .map(|row_index| {
+ format!(
+ "{}: ({}, {})",
+ row_index.stream_idx, row_index.batch_idx, row_index.row_idx
+ )
+ })
+ .collect()
+}