You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/05 21:27:04 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5886: Generify SortPreservingMerge (#5882) (#5879)

alamb commented on code in PR #5886:
URL: https://github.com/apache/arrow-datafusion/pull/5886#discussion_r1159027422


##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::index::RowIndex;
+use arrow::array::{make_array, MutableArrayData};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use std::collections::VecDeque;
+
+/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]

Review Comment:
   ```suggestion
   /// incrementally builds output [`RecordBatch`]es from multiple streams of  [`RecordBatch`] and 
   /// using [`RowIndex`] to describe the outputs
   ```



##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::index::RowIndex;
+use arrow::array::{make_array, MutableArrayData};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use std::collections::VecDeque;
+
+/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
+#[derive(Debug)]
+pub struct BatchBuilder {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// For each input stream maintain a dequeue of RecordBatches
+    ///
+    /// Exhausted batches will be popped off the front once all
+    /// their rows have been yielded to the output
+    batches: Vec<VecDeque<RecordBatch>>,
+
+    /// The accumulated row indexes for the next record batch
+    indices: Vec<RowIndex>,
+}
+
+impl BatchBuilder {
+    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
+    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
+        let batches = (0..stream_count).map(|_| VecDeque::new()).collect();
+
+        Self {
+            schema,
+            batches,
+            indices: Vec::with_capacity(batch_size),
+        }
+    }
+
+    /// Append a new batch in `stream_idx`
+    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
+        self.batches[stream_idx].push_back(batch)
+    }
+
+    /// Push `row_idx` from the most recently appended batch in `stream_idx`
+    pub fn push_row(&mut self, stream_idx: usize, row_idx: usize) {
+        let batch_idx = self.batches[stream_idx].len() - 1;
+        self.indices.push(RowIndex {
+            stream_idx,
+            batch_idx,
+            row_idx,
+        });
+    }
+
+    /// Returns the number of rows in this [`BatchBuilder`]

Review Comment:
   ```suggestion
       /// Returns the number of in-progress rows in this [`BatchBuilder`].
       ///
       /// This is the maximum number or rows that can be created by calling [`build_record_batch`]
   ```



##########
datafusion/core/src/physical_plan/sorts/stream.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::cursor::SortKeyCursor;
+use crate::physical_plan::SendableRecordBatchStream;
+use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr};
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use arrow::row::{RowConverter, SortField};
+use futures::stream::{Fuse, StreamExt};
+use std::sync::Arc;
+use std::task::{ready, Context, Poll};
+
+/// A [`Stream`](futures::Stream) that has multiple partitions that can
+/// be polled separately but not concurrently
+///
+/// Used by sort preserving merge to decouple the cursor merging logic from
+/// the source of the cursors, the intention being to allow preserving
+/// any row encoding performed for intermediate sorts
+pub trait PartitionedStream: std::fmt::Debug + Send {
+    type Output;
+
+    /// Returns the number of partitions
+    fn partitions(&self) -> usize;
+
+    fn poll_next(

Review Comment:
   Is there a reason this implements `poll_next` itself (rather than extending the standard `Stream` trait directly -- like `: Stream`?



##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::index::RowIndex;
+use arrow::array::{make_array, MutableArrayData};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use std::collections::VecDeque;
+
+/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
+#[derive(Debug)]
+pub struct BatchBuilder {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// For each input stream maintain a dequeue of RecordBatches
+    ///
+    /// Exhausted batches will be popped off the front once all
+    /// their rows have been yielded to the output
+    batches: Vec<VecDeque<RecordBatch>>,
+
+    /// The accumulated row indexes for the next record batch
+    indices: Vec<RowIndex>,
+}
+
+impl BatchBuilder {
+    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
+    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
+        let batches = (0..stream_count).map(|_| VecDeque::new()).collect();
+
+        Self {
+            schema,
+            batches,
+            indices: Vec::with_capacity(batch_size),
+        }
+    }
+
+    /// Append a new batch in `stream_idx`
+    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
+        self.batches[stream_idx].push_back(batch)
+    }
+
+    /// Push `row_idx` from the most recently appended batch in `stream_idx`

Review Comment:
   Also, I think it is a requirement that the row_idx are pushed in order, right?



##########
datafusion/core/src/physical_plan/sorts/cursor.rs:
##########
@@ -110,3 +110,24 @@ impl Ord for SortKeyCursor {
         }
     }
 }
+

Review Comment:
   What is the plan for the `Cursor` trait? As in why make at trait that has only a single impl?



##########
datafusion/core/src/physical_plan/sorts/stream.rs:
##########
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::cursor::SortKeyCursor;
+use crate::physical_plan::SendableRecordBatchStream;
+use crate::physical_plan::{PhysicalExpr, PhysicalSortExpr};
+use arrow::datatypes::Schema;
+use arrow::record_batch::RecordBatch;
+use arrow::row::{RowConverter, SortField};
+use futures::stream::{Fuse, StreamExt};
+use std::sync::Arc;
+use std::task::{ready, Context, Poll};
+
+/// A [`Stream`](futures::Stream) that has multiple partitions that can
+/// be polled separately but not concurrently
+///
+/// Used by sort preserving merge to decouple the cursor merging logic from
+/// the source of the cursors, the intention being to allow preserving

Review Comment:
   👍 
   
   So the idea is there will be a specialized implementation of cursors for row and single column primtives, etc?



##########
datafusion/core/src/physical_plan/sorts/builder.rs:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::common::Result;
+use crate::physical_plan::sorts::index::RowIndex;
+use arrow::array::{make_array, MutableArrayData};
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use std::collections::VecDeque;
+
+/// Provides an API to incrementally build a [`RecordBatch`] from partitioned [`RecordBatch`]
+#[derive(Debug)]
+pub struct BatchBuilder {
+    /// The schema of the RecordBatches yielded by this stream
+    schema: SchemaRef,
+    /// For each input stream maintain a dequeue of RecordBatches
+    ///
+    /// Exhausted batches will be popped off the front once all
+    /// their rows have been yielded to the output
+    batches: Vec<VecDeque<RecordBatch>>,
+
+    /// The accumulated row indexes for the next record batch
+    indices: Vec<RowIndex>,
+}
+
+impl BatchBuilder {
+    /// Create a new [`BatchBuilder`] with the provided `stream_count` and `batch_size`
+    pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> Self {
+        let batches = (0..stream_count).map(|_| VecDeque::new()).collect();
+
+        Self {
+            schema,
+            batches,
+            indices: Vec::with_capacity(batch_size),
+        }
+    }
+
+    /// Append a new batch in `stream_idx`
+    pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
+        self.batches[stream_idx].push_back(batch)
+    }
+
+    /// Push `row_idx` from the most recently appended batch in `stream_idx`

Review Comment:
   ```suggestion
       /// Push `row_idx` from the most recently appended batch in `stream_idx`
       /// into the queue of rows to generate as output
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org