You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/04/10 17:51:36 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5897: Specialize Primitive Cursor -- make sorts / merges on a single column faster

alamb commented on code in PR #5897:
URL: https://github.com/apache/arrow-datafusion/pull/5897#discussion_r1161935087


##########
datafusion/core/src/physical_plan/sorts/merge.rs:
##########
@@ -37,8 +52,16 @@ pub(crate) fn streaming_merge(
     tracking_metrics: MemTrackingMetrics,
     batch_size: usize,
 ) -> Result<SendableRecordBatchStream> {
-    let streams = RowCursorStream::try_new(schema.as_ref(), expressions, streams)?;
+    if expressions.len() == 1 {

Review Comment:
   I think it is worth a comment here explaining the rationale for this for future readers. Something like
   
   ```suggestion
       // special case single column primitives to avoid overhead of runtime comparators
       if expressions.len() == 1 {
   ```



##########
datafusion/core/src/physical_plan/sorts/cursor.rs:
##########
@@ -93,3 +96,232 @@ impl Cursor for RowCursor {
         t
     }
 }
+
+/// A cursor over sorted, nullable [`ArrowNativeTypeOp`]
+///
+/// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering
+#[derive(Debug)]
+pub struct PrimitiveCursor<T: ArrowNativeTypeOp> {
+    values: ScalarBuffer<T>,
+    offset: usize,
+    // If nulls first, the first non-null index
+    // Otherwise, the first null index
+    null_threshold: usize,
+    options: SortOptions,
+}
+
+impl<T: ArrowNativeTypeOp> PrimitiveCursor<T> {
+    /// Create a new [`PrimitiveCursor`] from the provided `values` sorted according to `options`
+    pub fn new(options: SortOptions, values: ScalarBuffer<T>, null_count: usize) -> Self {
+        assert!(null_count <= values.len());
+
+        let null_threshold = match options.nulls_first {
+            true => null_count,
+            false => values.len() - null_count,
+        };
+
+        Self {
+            values,
+            offset: 0,
+            null_threshold,
+            options,
+        }
+    }
+
+    fn is_null(&self) -> bool {
+        (self.offset < self.null_threshold) == self.options.nulls_first
+    }
+
+    fn value(&self) -> T {
+        self.values[self.offset]
+    }
+}
+
+impl<T: ArrowNativeTypeOp> PartialEq for PrimitiveCursor<T> {
+    fn eq(&self, other: &Self) -> bool {
+        self.cmp(other).is_eq()
+    }
+}
+
+impl<T: ArrowNativeTypeOp> Eq for PrimitiveCursor<T> {}
+impl<T: ArrowNativeTypeOp> PartialOrd for PrimitiveCursor<T> {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl<T: ArrowNativeTypeOp> Ord for PrimitiveCursor<T> {
+    fn cmp(&self, other: &Self) -> Ordering {
+        match (self.is_null(), other.is_null()) {
+            (true, true) => Ordering::Equal,
+            (true, false) => match self.options.nulls_first {
+                true => Ordering::Less,
+                false => Ordering::Greater,
+            },
+            (false, true) => match self.options.nulls_first {
+                true => Ordering::Greater,
+                false => Ordering::Less,
+            },
+            (false, false) => {
+                let s_v = self.value();
+                let o_v = other.value();
+
+                match self.options.descending {

Review Comment:
   I wonder if it would be worth specializing on consts `NULLS_FIRST` and `ASC/DESC` as well to avoid what looks like runtime overhead in the hot path.
   
   Maybe it doesn't make any practical difference



##########
datafusion/core/src/physical_plan/sorts/stream.rs:
##########
@@ -139,3 +142,67 @@ impl PartitionedStream for RowCursorStream {
         }))
     }
 }
+
+pub struct PrimitiveCursorStream<T: ArrowPrimitiveType> {

Review Comment:
   ```suggestion
   /// Specialized stream for sorts on single primitive columns
   pub struct PrimitiveCursorStream<T: ArrowPrimitiveType> {
   ```



##########
datafusion/core/src/physical_plan/sorts/cursor.rs:
##########
@@ -93,3 +96,232 @@ impl Cursor for RowCursor {
         t
     }
 }
+
+/// A cursor over sorted, nullable [`ArrowNativeTypeOp`]
+///
+/// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering
+#[derive(Debug)]
+pub struct PrimitiveCursor<T: ArrowNativeTypeOp> {
+    values: ScalarBuffer<T>,
+    offset: usize,
+    // If nulls first, the first non-null index
+    // Otherwise, the first null index
+    null_threshold: usize,

Review Comment:
   Is there any reason it is called `null_threshold` rather than `null_index` given it is a null index 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org