You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/30 12:16:34 UTC

[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #2645: Move `SortKeyCursor` and `RowIndex` into modules, add `sort_key_cursor` test

tustvold commented on code in PR #2645:
URL: https://github.com/apache/arrow-datafusion/pull/2645#discussion_r884763644


##########
datafusion/core/src/physical_plan/sorts/index.rs:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// A `RowIndex` identifies a specific row in a logical stream.
+///
+/// Each stream is identified by an `stream_idx` and is formed from a
+/// sequence of RecordBatches batches, each of which is identified by
+/// a unique `batch_idx` within that stream.
+///
+/// This is used by `SortPreservingMergeStream` to identify which
+/// the order of the tuples in the final sorted output stream.
+///
+/// ```text

Review Comment:
   This diagram seems to imply all streams must have the same number of batches, I'm not sure this is the case



##########
datafusion/core/tests/sort_key_cursor.rs:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains tests for SortKeyCursor
+
+use std::{cmp::Ordering, sync::Arc};
+
+use arrow::{array::Int64Array, compute::SortOptions, record_batch::RecordBatch};
+use datafusion::physical_plan::sorts::{RowIndex, SortKeyCursor};
+use datafusion_physical_expr::expressions::col;
+
+/// Compares [`RowIndex`]es with a vector of strings, the result of
+/// pretty formatting the [`RowIndex`]es. This is a macro so errors

Review Comment:
   I mean unwrap() will give you a stack trace... I'm not a massive fan of macros as they are hard to read, and slow to compile...



##########
datafusion/core/src/physical_plan/sorts/index.rs:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// A `RowIndex` identifies a specific row in a logical stream.
+///
+/// Each stream is identified by an `stream_idx` and is formed from a
+/// sequence of RecordBatches batches, each of which is identified by
+/// a unique `batch_idx` within that stream.
+///
+/// This is used by `SortPreservingMergeStream` to identify which
+/// the order of the tuples in the final sorted output stream.
+///
+/// ```text
+/// ┌────┐ ┌────┐ ┌────┐           RecordBatch
+/// │    │ │    │ │    │
+/// │ C1 │ │... │ │ CN │◀─────── (batch_idx = 0)
+/// │    │ │    │ │    │
+/// └────┘ └────┘ └────┘
+///
+/// ┌────┐ ┌────┐ ┌────┐           RecordBatch
+/// │    │ │    │ │    │
+/// │ C1 │ │... │ │ CN │◀─────── (batch_idx = 1)
+/// │    │ │    │ │    │
+/// └────┘ └────┘ └────┘
+///
+///          ...
+///
+/// ┌────┐ ┌────┐ ┌────┐           RecordBatch
+/// │    │ │    │ │    │
+/// │ C1 │ │... │ │ CN │◀────── (batch_idx = N-1)

Review Comment:
   ```suggestion
   /// │ C1 │ │... │ │ CN │◀────── (batch_idx = M-1)
   ```
   I think the dimensions are independent?



##########
datafusion/core/tests/sort_key_cursor.rs:
##########
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains tests for SortKeyCursor
+
+use std::{cmp::Ordering, sync::Arc};
+
+use arrow::{array::Int64Array, compute::SortOptions, record_batch::RecordBatch};
+use datafusion::physical_plan::sorts::{RowIndex, SortKeyCursor};
+use datafusion_physical_expr::expressions::col;
+
+/// Compares [`RowIndex`]es with a vector of strings, the result of
+/// pretty formatting the [`RowIndex`]es. This is a macro so errors
+/// appear on the correct line.
+///
+/// Designed so that failure output can be directly copy/pasted
+/// into the test code as expected results.
+///
+/// Expects to be called about like this:
+///
+/// `assert_indexes!(expected_indexes: &[&str], indexes: &[RowIndex])`
+#[macro_export]
+macro_rules! assert_indexes {
+    ($EXPECTED_LINES: expr, $INDEXES: expr) => {
+        let expected_lines: Vec<String> =
+            $EXPECTED_LINES.iter().map(|&s| s.into()).collect();
+
+        let actual_lines = format_as_strings($INDEXES);
+
+        assert_eq!(
+            expected_lines, actual_lines,
+            "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+            expected_lines, actual_lines
+        );
+    };
+}
+
+#[test]
+fn test_single_column() {
+    let array1 = Int64Array::from(vec![Some(1), Some(2), Some(5), Some(6)]);
+    let batch1 = RecordBatch::try_from_iter(vec![("c1", Arc::new(array1) as _)]).unwrap();
+
+    let array2 = Int64Array::from(vec![Some(3), Some(4), Some(8), Some(9)]);
+    let batch2 = RecordBatch::try_from_iter(vec![("c1", Arc::new(array2) as _)]).unwrap();
+
+    let c1 = col("c1", &batch1.schema()).unwrap();
+    let sort_key = vec![c1];
+
+    let sort_options = Arc::new(vec![SortOptions::default()]);
+
+    let mut cursor1 =
+        SortKeyCursor::new(1, 0, &batch1, &sort_key, Arc::clone(&sort_options)).unwrap();
+    let mut cursor2 =
+        SortKeyCursor::new(2, 0, &batch2, &sort_key, Arc::clone(&sort_options)).unwrap();
+
+    let expected = vec![
+        "1: (0, 0)",
+        "1: (0, 1)",
+        "2: (0, 0)",
+        "2: (0, 1)",
+        "1: (0, 2)",
+        "1: (0, 3)",
+        "2: (0, 2)",
+        "2: (0, 3)",
+    ];
+
+    assert_indexes!(expected, run(&mut cursor1, &mut cursor2));
+}
+
+/// Runs the two cursors to completion, sorting them, and
+/// returning the sorted order of rows that would have produced
+fn run(cursor1: &mut SortKeyCursor, cursor2: &mut SortKeyCursor) -> Vec<RowIndex> {

Review Comment:
   This code doesn't seem to be being used by SortPreservingMerge, is this intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org