You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/28 17:08:59 UTC

[GitHub] [arrow] benibus opened a new pull request, #14753: ARROW-18395: [C++] Move select-k implementation into separate module

benibus opened a new pull request, #14753:
URL: https://github.com/apache/arrow/pull/14753

   See: [ARROW-18395](https://issues.apache.org/jira/browse/ARROW-18395)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14753:
URL: https://github.com/apache/arrow/pull/14753#discussion_r1047661511


##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys) {
+  std::vector<SortField> fields;
+  std::unordered_set<int> seen;
+  fields.reserve(sort_keys.size());
+  seen.reserve(sort_keys.size());
+
+  for (const auto& sort_key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          PrependInvalidColumn(sort_key.target.FindOne(schema)));
+    if (seen.insert(match[0]).second) {
+      fields.push_back({match[0], sort_key.order});
+    }
+  }
+  return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,

Review Comment:
   I think you could just do it in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1351365114

   > Not quite sure if you think exposing the one common `ResolvedSortKey` in the header would be worthwhile - since I basically just copied it into select-k's `RecordBatchSelector`. In practice, there are 3 unique `ResolvedSortKey` definitions across 4 classes.
   
   Basically there should be two possible `ResolvedSortKey` implementations: one for record batch (contiguous) input, the other for table (chunked) input. So, yes, I think those two implementations should be put in the header.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on a diff in pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on code in PR #14753:
URL: https://github.com/apache/arrow/pull/14753#discussion_r1047659723


##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys) {
+  std::vector<SortField> fields;
+  std::unordered_set<int> seen;
+  fields.reserve(sort_keys.size());
+  seen.reserve(sort_keys.size());
+
+  for (const auto& sort_key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          PrependInvalidColumn(sort_key.target.FindOne(schema)));
+    if (seen.insert(match[0]).second) {
+      fields.push_back({match[0], sort_key.order});
+    }
+  }
+  return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,

Review Comment:
   Agreed. I just opened an issue for it: https://github.com/apache/arrow/issues/14939
   
   Do you think that should be done first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1377468959

   Yes, it's certainly fine to fix the tests so as to match the new (and better) error message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1371208872

   Probably worth noting that `GetTableColumn` (which I replaced in the last commit) isn't exactly equivalent to `FieldRef::GetOne` since the former will (a) reject nested fields and (b) won't search for multiple matching `FieldRef`s - unlike the latter, which calls `FieldRef::FindAll` under the hood.
   
   That being said, those constraints/optimizations don't currently apply to the `RecordBatch` functions, so I'm not quite sure what the call would be here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1329662643

   Not sure if there are any strong opinions about the inline functions in `vector_sort_internal.h`... Some of the definitions could remain where they were if necessary.
   
   Also, I removed a bunch of redundant `#include`s in the implementations - although I don't really know if they were just cruft or meant to prevent future breakages, as quite a few things are being implicitly pulled in by two headers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14753:
URL: https://github.com/apache/arrow/pull/14753#discussion_r1048475757


##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +457,243 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK/Rank implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+Result<std::vector<SortField>> FindSortKeys(const Schema& schema,
+                                            const std::vector<SortKey>& sort_keys);
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
+                                                    const FieldRef& ref) {
+  if (ref.IsNested()) return nullptr;
+
+  if (auto name = ref.name()) {
+    return table.GetColumnByName(*name);
+  }
+
+  auto index = ref.field_path()->indices()[0];
+  if (index >= table.num_columns()) return nullptr;
+  return table.column(index);
+}
+
+// We could try to reproduce the concrete Array classes' facilities
+// (such as cached raw values pointer) in a separate hierarchy of
+// physical accessors, but doing so ends up too cumbersome.
+// Instead, we simply create the desired concrete Array objects.
+inline std::shared_ptr<Array> GetPhysicalArray(
+    const Array& array, const std::shared_ptr<DataType>& physical_type) {
+  auto new_data = array.data()->Copy();
+  new_data->type = physical_type;
+  return MakeArray(std::move(new_data));
+}
+
+inline ArrayVector GetPhysicalChunks(const ArrayVector& chunks,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  ArrayVector physical(chunks.size());
+  std::transform(chunks.begin(), chunks.end(), physical.begin(),
+                 [&](const std::shared_ptr<Array>& array) {
+                   return GetPhysicalArray(*array, physical_type);
+                 });
+  return physical;
+}
+
+inline ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  return GetPhysicalChunks(chunked_array.chunks(), physical_type);
+}
+
+// Compare two records in a single column (either from a batch or table)
+template <typename ResolvedSortKey>
+struct ColumnComparator {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  ColumnComparator(const ResolvedSortKey& sort_key, NullPlacement null_placement)
+      : sort_key_(sort_key), null_placement_(null_placement) {}
+
+  virtual ~ColumnComparator() = default;
+
+  virtual int Compare(const Location& left, const Location& right) const = 0;
+
+  ResolvedSortKey sort_key_;
+  NullPlacement null_placement_;
+};
+
+template <typename ResolvedSortKey, typename Type>
+struct ConcreteColumnComparator : public ColumnComparator<ResolvedSortKey> {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override {
+    const auto& sort_key = this->sort_key_;
+
+    const auto chunk_left = sort_key.template GetChunk<ArrayType>(left);
+    const auto chunk_right = sort_key.template GetChunk<ArrayType>(right);
+    if (sort_key.null_count > 0) {
+      const bool is_null_left = chunk_left.IsNull();
+      const bool is_null_right = chunk_right.IsNull();
+      if (is_null_left && is_null_right) {
+        return 0;
+      } else if (is_null_left) {
+        return this->null_placement_ == NullPlacement::AtStart ? -1 : 1;
+      } else if (is_null_right) {
+        return this->null_placement_ == NullPlacement::AtStart ? 1 : -1;
+      }
+    }
+    return CompareTypeValues<Type>(chunk_left.Value(), chunk_right.Value(),
+                                   sort_key.order, this->null_placement_);
+  }
+};
+
+template <typename ResolvedSortKey>
+struct ConcreteColumnComparator<ResolvedSortKey, NullType>
+    : public ColumnComparator<ResolvedSortKey> {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override { return 0; }
+};
+
+// Compare two records in the same RecordBatch or Table
+// (indexing is handled through ResolvedSortKey)
+template <typename ResolvedSortKey>
+class MultipleKeyComparator {
+ public:
+  using Location = typename ResolvedSortKey::LocationType;
+
+  MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys,
+                        NullPlacement null_placement)
+      : sort_keys_(sort_keys), null_placement_(null_placement) {
+    status_ &= MakeComparators();
+  }
+
+  Status status() const { return status_; }
+
+  // Returns true if the left-th value should be ordered before the
+  // right-th value, false otherwise. The start_sort_key_index-th
+  // sort key and subsequent sort keys are used for comparison.
+  bool Compare(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) < 0;
+  }
+
+  bool Equals(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) == 0;
+  }
+
+ private:
+  struct ColumnComparatorFactory {
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return VisitGeneric(type); }
+
+    VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+    VISIT(NullType)
+
+#undef VISIT
+
+    Status Visit(const DataType& type) {
+      return Status::TypeError("Unsupported type for batch or table sorting: ",
+                               type.ToString());
+    }
+
+    template <typename Type>
+    Status VisitGeneric(const Type& type) {
+      res.reset(
+          new ConcreteColumnComparator<ResolvedSortKey, Type>{sort_key, null_placement});
+      return Status::OK();
+    }
+
+    const ResolvedSortKey& sort_key;
+    NullPlacement null_placement;
+    std::unique_ptr<ColumnComparator<ResolvedSortKey>> res;
+  };
+
+  Status MakeComparators() {
+    column_comparators_.reserve(sort_keys_.size());
+
+    for (const auto& sort_key : sort_keys_) {
+      ColumnComparatorFactory factory{sort_key, null_placement_, nullptr};
+      RETURN_NOT_OK(VisitTypeInline(*sort_key.type, &factory));
+      column_comparators_.push_back(std::move(factory.res));
+    }
+    return Status::OK();
+  }
+
+  // Compare two records in the same table and return -1, 0 or 1.
+  //
+  // -1: The left is less than the right.
+  // 0: The left equals to the right.
+  // 1: The left is greater than the right.
+  //
+  // This supports null and NaN. Null is processed in this and NaN
+  // is processed in CompareTypeValue().
+  int CompareInternal(const Location& left, const Location& right,
+                      size_t start_sort_key_index) {
+    const auto num_sort_keys = sort_keys_.size();
+    for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+      const int r = column_comparators_[i]->Compare(left, right);
+      if (r != 0) {
+        return r;
+      }
+    }
+    return 0;
+  }
+
+  const std::vector<ResolvedSortKey>& sort_keys_;
+  const NullPlacement null_placement_;
+  std::vector<std::unique_ptr<ColumnComparator<ResolvedSortKey>>> column_comparators_;
+  Status status_;
+};
+
+inline Result<std::shared_ptr<ArrayData>> MakeMutableUInt64Array(
+    std::shared_ptr<DataType> out_type, int64_t length, MemoryPool* memory_pool) {

Review Comment:
   Is `out_type` unused here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1371215821

   Interesting. I don't think we intended to support nested fields (we would have to decide whether to take into account outer-level nulls or not), so we should perhaps disallow them everywhere for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14753:
URL: https://github.com/apache/arrow/pull/14753#discussion_r1039786012


##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(

Review Comment:
   The definition for this can be moved in a `.cc` file IMHO.



##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys) {
+  std::vector<SortField> fields;
+  std::unordered_set<int> seen;
+  fields.reserve(sort_keys.size());
+  seen.reserve(sort_keys.size());
+
+  for (const auto& sort_key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          PrependInvalidColumn(sort_key.target.FindOne(schema)));
+    if (seen.insert(match[0]).second) {
+      fields.push_back({match[0], sort_key.order});
+    }
+  }
+  return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
+                                                    const FieldRef& ref) {
+  if (ref.IsNested()) return nullptr;
+
+  if (auto name = ref.name()) {
+    return table.GetColumnByName(*name);
+  }
+
+  auto index = ref.field_path()->indices()[0];
+  if (index >= table.num_columns()) return nullptr;
+  return table.column(index);
+}
+
+// We could try to reproduce the concrete Array classes' facilities
+// (such as cached raw values pointer) in a separate hierarchy of
+// physical accessors, but doing so ends up too cumbersome.
+// Instead, we simply create the desired concrete Array objects.
+inline std::shared_ptr<Array> GetPhysicalArray(
+    const Array& array, const std::shared_ptr<DataType>& physical_type) {
+  auto new_data = array.data()->Copy();
+  new_data->type = physical_type;
+  return MakeArray(std::move(new_data));
+}
+
+inline ArrayVector GetPhysicalChunks(const ArrayVector& chunks,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  ArrayVector physical(chunks.size());
+  std::transform(chunks.begin(), chunks.end(), physical.begin(),
+                 [&](const std::shared_ptr<Array>& array) {
+                   return GetPhysicalArray(*array, physical_type);
+                 });
+  return physical;
+}
+
+inline ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  return GetPhysicalChunks(chunked_array.chunks(), physical_type);
+}
+
+// Compare two records in a single column (either from a batch or table)
+template <typename ResolvedSortKey>
+struct ColumnComparator {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  ColumnComparator(const ResolvedSortKey& sort_key, NullPlacement null_placement)
+      : sort_key_(sort_key), null_placement_(null_placement) {}
+
+  virtual ~ColumnComparator() = default;
+
+  virtual int Compare(const Location& left, const Location& right) const = 0;
+
+  ResolvedSortKey sort_key_;
+  NullPlacement null_placement_;
+};
+
+template <typename ResolvedSortKey, typename Type>
+struct ConcreteColumnComparator : public ColumnComparator<ResolvedSortKey> {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override {
+    const auto& sort_key = this->sort_key_;
+
+    const auto chunk_left = sort_key.template GetChunk<ArrayType>(left);
+    const auto chunk_right = sort_key.template GetChunk<ArrayType>(right);
+    if (sort_key.null_count > 0) {
+      const bool is_null_left = chunk_left.IsNull();
+      const bool is_null_right = chunk_right.IsNull();
+      if (is_null_left && is_null_right) {
+        return 0;
+      } else if (is_null_left) {
+        return this->null_placement_ == NullPlacement::AtStart ? -1 : 1;
+      } else if (is_null_right) {
+        return this->null_placement_ == NullPlacement::AtStart ? 1 : -1;
+      }
+    }
+    return CompareTypeValues<Type>(chunk_left.Value(), chunk_right.Value(),
+                                   sort_key.order, this->null_placement_);
+  }
+};
+
+template <typename ResolvedSortKey>
+struct ConcreteColumnComparator<ResolvedSortKey, NullType>
+    : public ColumnComparator<ResolvedSortKey> {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override { return 0; }
+};
+
+// Compare two records in the same RecordBatch or Table
+// (indexing is handled through ResolvedSortKey)
+template <typename ResolvedSortKey>
+class MultipleKeyComparator {
+ public:
+  using Location = typename ResolvedSortKey::LocationType;
+
+  MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys,
+                        NullPlacement null_placement)
+      : sort_keys_(sort_keys), null_placement_(null_placement) {
+    status_ &= MakeComparators();
+  }
+
+  Status status() const { return status_; }
+
+  // Returns true if the left-th value should be ordered before the
+  // right-th value, false otherwise. The start_sort_key_index-th
+  // sort key and subsequent sort keys are used for comparison.
+  bool Compare(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) < 0;
+  }
+
+  bool Equals(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) == 0;
+  }
+
+ private:
+  struct ColumnComparatorFactory {
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return VisitGeneric(type); }
+
+    VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+    VISIT(NullType)
+
+#undef VISIT
+
+    Status Visit(const DataType& type) {
+      return Status::TypeError("Unsupported type for batch or table sorting: ",
+                               type.ToString());
+    }
+
+    template <typename Type>
+    Status VisitGeneric(const Type& type) {
+      res.reset(
+          new ConcreteColumnComparator<ResolvedSortKey, Type>{sort_key, null_placement});
+      return Status::OK();
+    }
+
+    const ResolvedSortKey& sort_key;
+    NullPlacement null_placement;
+    std::unique_ptr<ColumnComparator<ResolvedSortKey>> res;
+  };
+
+  Status MakeComparators() {
+    column_comparators_.reserve(sort_keys_.size());
+
+    for (const auto& sort_key : sort_keys_) {
+      ColumnComparatorFactory factory{sort_key, null_placement_, nullptr};
+      RETURN_NOT_OK(VisitTypeInline(*sort_key.type, &factory));
+      column_comparators_.push_back(std::move(factory.res));
+    }
+    return Status::OK();
+  }
+
+  // Compare two records in the same table and return -1, 0 or 1.
+  //
+  // -1: The left is less than the right.
+  // 0: The left equals to the right.
+  // 1: The left is greater than the right.
+  //
+  // This supports null and NaN. Null is processed in this and NaN
+  // is processed in CompareTypeValue().
+  int CompareInternal(const Location& left, const Location& right,
+                      size_t start_sort_key_index) {
+    const auto num_sort_keys = sort_keys_.size();
+    for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+      const int r = column_comparators_[i]->Compare(left, right);
+      if (r != 0) {
+        return r;
+      }
+    }
+    return 0;
+  }
+
+  const std::vector<ResolvedSortKey>& sort_keys_;
+  const NullPlacement null_placement_;
+  std::vector<std::unique_ptr<ColumnComparator<ResolvedSortKey>>> column_comparators_;
+  Status status_;
+};
+
+// Sort a batch using a single sort and multiple-key comparisons.
+class MultipleKeyRecordBatchSorter : public TypeVisitor {

Review Comment:
   I'm not sure it's useful to expose this internal class in a `.h` file? By the looks of it, only `MultipleKeyRecordBatchSorter::ResolvedSortKey` is used by the select-k kernels?



##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys) {
+  std::vector<SortField> fields;
+  std::unordered_set<int> seen;
+  fields.reserve(sort_keys.size());
+  seen.reserve(sort_keys.size());
+
+  for (const auto& sort_key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          PrependInvalidColumn(sort_key.target.FindOne(schema)));
+    if (seen.insert(match[0]).second) {
+      fields.push_back({match[0], sort_key.order});
+    }
+  }
+  return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,
+                                                    const FieldRef& ref) {
+  if (ref.IsNested()) return nullptr;
+
+  if (auto name = ref.name()) {
+    return table.GetColumnByName(*name);
+  }
+
+  auto index = ref.field_path()->indices()[0];
+  if (index >= table.num_columns()) return nullptr;
+  return table.column(index);
+}
+
+// We could try to reproduce the concrete Array classes' facilities
+// (such as cached raw values pointer) in a separate hierarchy of
+// physical accessors, but doing so ends up too cumbersome.
+// Instead, we simply create the desired concrete Array objects.
+inline std::shared_ptr<Array> GetPhysicalArray(
+    const Array& array, const std::shared_ptr<DataType>& physical_type) {
+  auto new_data = array.data()->Copy();
+  new_data->type = physical_type;
+  return MakeArray(std::move(new_data));
+}
+
+inline ArrayVector GetPhysicalChunks(const ArrayVector& chunks,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  ArrayVector physical(chunks.size());
+  std::transform(chunks.begin(), chunks.end(), physical.begin(),
+                 [&](const std::shared_ptr<Array>& array) {
+                   return GetPhysicalArray(*array, physical_type);
+                 });
+  return physical;
+}
+
+inline ArrayVector GetPhysicalChunks(const ChunkedArray& chunked_array,
+                                     const std::shared_ptr<DataType>& physical_type) {
+  return GetPhysicalChunks(chunked_array.chunks(), physical_type);
+}
+
+// Compare two records in a single column (either from a batch or table)
+template <typename ResolvedSortKey>
+struct ColumnComparator {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  ColumnComparator(const ResolvedSortKey& sort_key, NullPlacement null_placement)
+      : sort_key_(sort_key), null_placement_(null_placement) {}
+
+  virtual ~ColumnComparator() = default;
+
+  virtual int Compare(const Location& left, const Location& right) const = 0;
+
+  ResolvedSortKey sort_key_;
+  NullPlacement null_placement_;
+};
+
+template <typename ResolvedSortKey, typename Type>
+struct ConcreteColumnComparator : public ColumnComparator<ResolvedSortKey> {
+  using ArrayType = typename TypeTraits<Type>::ArrayType;
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override {
+    const auto& sort_key = this->sort_key_;
+
+    const auto chunk_left = sort_key.template GetChunk<ArrayType>(left);
+    const auto chunk_right = sort_key.template GetChunk<ArrayType>(right);
+    if (sort_key.null_count > 0) {
+      const bool is_null_left = chunk_left.IsNull();
+      const bool is_null_right = chunk_right.IsNull();
+      if (is_null_left && is_null_right) {
+        return 0;
+      } else if (is_null_left) {
+        return this->null_placement_ == NullPlacement::AtStart ? -1 : 1;
+      } else if (is_null_right) {
+        return this->null_placement_ == NullPlacement::AtStart ? 1 : -1;
+      }
+    }
+    return CompareTypeValues<Type>(chunk_left.Value(), chunk_right.Value(),
+                                   sort_key.order, this->null_placement_);
+  }
+};
+
+template <typename ResolvedSortKey>
+struct ConcreteColumnComparator<ResolvedSortKey, NullType>
+    : public ColumnComparator<ResolvedSortKey> {
+  using Location = typename ResolvedSortKey::LocationType;
+
+  using ColumnComparator<ResolvedSortKey>::ColumnComparator;
+
+  int Compare(const Location& left, const Location& right) const override { return 0; }
+};
+
+// Compare two records in the same RecordBatch or Table
+// (indexing is handled through ResolvedSortKey)
+template <typename ResolvedSortKey>
+class MultipleKeyComparator {
+ public:
+  using Location = typename ResolvedSortKey::LocationType;
+
+  MultipleKeyComparator(const std::vector<ResolvedSortKey>& sort_keys,
+                        NullPlacement null_placement)
+      : sort_keys_(sort_keys), null_placement_(null_placement) {
+    status_ &= MakeComparators();
+  }
+
+  Status status() const { return status_; }
+
+  // Returns true if the left-th value should be ordered before the
+  // right-th value, false otherwise. The start_sort_key_index-th
+  // sort key and subsequent sort keys are used for comparison.
+  bool Compare(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) < 0;
+  }
+
+  bool Equals(const Location& left, const Location& right, size_t start_sort_key_index) {
+    return CompareInternal(left, right, start_sort_key_index) == 0;
+  }
+
+ private:
+  struct ColumnComparatorFactory {
+#define VISIT(TYPE) \
+  Status Visit(const TYPE& type) { return VisitGeneric(type); }
+
+    VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+    VISIT(NullType)
+
+#undef VISIT
+
+    Status Visit(const DataType& type) {
+      return Status::TypeError("Unsupported type for batch or table sorting: ",
+                               type.ToString());
+    }
+
+    template <typename Type>
+    Status VisitGeneric(const Type& type) {
+      res.reset(
+          new ConcreteColumnComparator<ResolvedSortKey, Type>{sort_key, null_placement});
+      return Status::OK();
+    }
+
+    const ResolvedSortKey& sort_key;
+    NullPlacement null_placement;
+    std::unique_ptr<ColumnComparator<ResolvedSortKey>> res;
+  };
+
+  Status MakeComparators() {
+    column_comparators_.reserve(sort_keys_.size());
+
+    for (const auto& sort_key : sort_keys_) {
+      ColumnComparatorFactory factory{sort_key, null_placement_, nullptr};
+      RETURN_NOT_OK(VisitTypeInline(*sort_key.type, &factory));
+      column_comparators_.push_back(std::move(factory.res));
+    }
+    return Status::OK();
+  }
+
+  // Compare two records in the same table and return -1, 0 or 1.
+  //
+  // -1: The left is less than the right.
+  // 0: The left equals to the right.
+  // 1: The left is greater than the right.
+  //
+  // This supports null and NaN. Null is processed in this and NaN
+  // is processed in CompareTypeValue().
+  int CompareInternal(const Location& left, const Location& right,
+                      size_t start_sort_key_index) {
+    const auto num_sort_keys = sort_keys_.size();
+    for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+      const int r = column_comparators_[i]->Compare(left, right);
+      if (r != 0) {
+        return r;
+      }
+    }
+    return 0;
+  }
+
+  const std::vector<ResolvedSortKey>& sort_keys_;
+  const NullPlacement null_placement_;
+  std::vector<std::unique_ptr<ColumnComparator<ResolvedSortKey>>> column_comparators_;
+  Status status_;
+};
+
+// Sort a batch using a single sort and multiple-key comparisons.
+class MultipleKeyRecordBatchSorter : public TypeVisitor {

Review Comment:
   (same for other classes above perhaps)



##########
cpp/src/arrow/compute/kernels/vector_select_k.cc:
##########
@@ -0,0 +1,858 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <queue>
+
+#include "arrow/compute/kernels/vector_sort_internal.h"
+#include "arrow/compute/registry.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute::internal {
+
+namespace {
+
+// ----------------------------------------------------------------------
+// TopK/BottomK implementations
+
+const SelectKOptions* GetDefaultSelectKOptions() {
+  static const auto kDefaultSelectKOptions = SelectKOptions::Defaults();
+  return &kDefaultSelectKOptions;
+}
+
+const FunctionDoc select_k_unstable_doc(
+    "Select the indices of the first `k` ordered elements from the input",
+    ("This function selects an array of indices of the first `k` ordered elements\n"
+     "from the `input` array, record batch or table specified in the column keys\n"
+     "(`options.sort_keys`). Output is not guaranteed to be stable.\n"
+     "Null values are considered greater than any other value and are\n"
+     "therefore ordered at the end. For floating-point types, NaNs are considered\n"
+     "greater than any other non-null value, but smaller than null values."),
+    {"input"}, "SelectKOptions", /*options_required=*/true);
+
+Result<std::shared_ptr<ArrayData>> MakeMutableUInt64Array(
+    std::shared_ptr<DataType> out_type, int64_t length, MemoryPool* memory_pool) {
+  auto buffer_size = length * sizeof(uint64_t);
+  ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(buffer_size, memory_pool));
+  return ArrayData::Make(uint64(), length, {nullptr, std::move(data)}, /*null_count=*/0);
+}
+
+template <SortOrder order>
+class SelectKComparator {
+ public:
+  template <typename Type>
+  bool operator()(const Type& lval, const Type& rval);
+};
+
+template <>
+class SelectKComparator<SortOrder::Ascending> {
+ public:
+  template <typename Type>
+  bool operator()(const Type& lval, const Type& rval) {
+    return lval < rval;
+  }
+};
+
+template <>
+class SelectKComparator<SortOrder::Descending> {
+ public:
+  template <typename Type>
+  bool operator()(const Type& lval, const Type& rval) {
+    return rval < lval;
+  }
+};
+
+class ArraySelecter : public TypeVisitor {
+ public:
+  ArraySelecter(ExecContext* ctx, const Array& array, const SelectKOptions& options,
+                Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        array_(array),
+        k_(options.k),
+        order_(options.sort_keys[0].order),
+        physical_type_(GetPhysicalType(array.type())),
+        output_(output) {}
+
+  Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE)                                           \
+  Status Visit(const TYPE& type) {                            \
+    if (order_ == SortOrder::Ascending) {                     \
+      return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+    }                                                         \
+    return SelectKthInternal<TYPE, SortOrder::Descending>();  \
+  }
+
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+  template <typename InType, SortOrder sort_order>
+  Status SelectKthInternal() {
+    using GetView = GetViewType<InType>;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+
+    ArrayType arr(array_.data());
+    std::vector<uint64_t> indices(arr.length());
+
+    uint64_t* indices_begin = indices.data();
+    uint64_t* indices_end = indices_begin + indices.size();
+    std::iota(indices_begin, indices_end, 0);
+    if (k_ > arr.length()) {
+      k_ = arr.length();
+    }
+
+    const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+        indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+    const auto end_iter = p.non_nulls_end;
+
+    auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+    SelectKComparator<sort_order> comparator;
+    auto cmp = [&arr, &comparator](uint64_t left, uint64_t right) {
+      const auto lval = GetView::LogicalValue(arr.GetView(left));
+      const auto rval = GetView::LogicalValue(arr.GetView(right));
+      return comparator(lval, rval);
+    };
+    using HeapContainer =
+        std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+    HeapContainer heap(indices_begin, kth_begin, cmp);
+    for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+      uint64_t x_index = *iter;
+      if (cmp(x_index, heap.top())) {
+        heap.pop();
+        heap.push(x_index);
+      }
+    }
+    auto out_size = static_cast<int64_t>(heap.size());
+    ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(), out_size,
+                                                                    ctx_->memory_pool()));
+
+    auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size - 1;
+    while (heap.size() > 0) {
+      *out_cbegin = heap.top();
+      heap.pop();
+      --out_cbegin;
+    }
+    *output_ = Datum(take_indices);
+    return Status::OK();
+  }
+
+  ExecContext* ctx_;
+  const Array& array_;
+  int64_t k_;
+  SortOrder order_;
+  const std::shared_ptr<DataType> physical_type_;
+  Datum* output_;
+};
+
+template <typename ArrayType>
+struct TypedHeapItem {
+  uint64_t index;
+  uint64_t offset;
+  ArrayType* array;
+};
+
+class ChunkedArraySelecter : public TypeVisitor {
+ public:
+  ChunkedArraySelecter(ExecContext* ctx, const ChunkedArray& chunked_array,
+                       const SelectKOptions& options, Datum* output)
+      : TypeVisitor(),
+        chunked_array_(chunked_array),
+        physical_type_(GetPhysicalType(chunked_array.type())),
+        physical_chunks_(GetPhysicalChunks(chunked_array_, physical_type_)),
+        k_(options.k),
+        order_(options.sort_keys[0].order),
+        ctx_(ctx),
+        output_(output) {}
+
+  Status Run() { return physical_type_->Accept(this); }
+
+#define VISIT(TYPE)                                           \
+  Status Visit(const TYPE& type) {                            \
+    if (order_ == SortOrder::Ascending) {                     \
+      return SelectKthInternal<TYPE, SortOrder::Ascending>(); \
+    }                                                         \
+    return SelectKthInternal<TYPE, SortOrder::Descending>();  \
+  }
+
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+#undef VISIT
+
+  template <typename InType, SortOrder sort_order>
+  Status SelectKthInternal() {
+    using GetView = GetViewType<InType>;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+    using HeapItem = TypedHeapItem<ArrayType>;
+
+    const auto num_chunks = chunked_array_.num_chunks();
+    if (num_chunks == 0) {
+      return Status::OK();
+    }
+    if (k_ > chunked_array_.length()) {
+      k_ = chunked_array_.length();
+    }
+    std::function<bool(const HeapItem&, const HeapItem&)> cmp;
+    SelectKComparator<sort_order> comparator;
+
+    cmp = [&comparator](const HeapItem& left, const HeapItem& right) -> bool {
+      const auto lval = GetView::LogicalValue(left.array->GetView(left.index));
+      const auto rval = GetView::LogicalValue(right.array->GetView(right.index));
+      return comparator(lval, rval);
+    };
+    using HeapContainer =
+        std::priority_queue<HeapItem, std::vector<HeapItem>, decltype(cmp)>;
+
+    HeapContainer heap(cmp);
+    std::vector<std::shared_ptr<ArrayType>> chunks_holder;
+    uint64_t offset = 0;
+    for (const auto& chunk : physical_chunks_) {
+      if (chunk->length() == 0) continue;
+      chunks_holder.emplace_back(std::make_shared<ArrayType>(chunk->data()));
+      ArrayType& arr = *chunks_holder[chunks_holder.size() - 1];
+
+      std::vector<uint64_t> indices(arr.length());
+      uint64_t* indices_begin = indices.data();
+      uint64_t* indices_end = indices_begin + indices.size();
+      std::iota(indices_begin, indices_end, 0);
+
+      const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+          indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+      const auto end_iter = p.non_nulls_end;
+
+      auto kth_begin = std::min(indices_begin + k_, end_iter);
+      uint64_t* iter = indices_begin;
+      for (; iter != kth_begin && heap.size() < static_cast<size_t>(k_); ++iter) {
+        heap.push(HeapItem{*iter, offset, &arr});
+      }
+      for (; iter != end_iter && !heap.empty(); ++iter) {
+        uint64_t x_index = *iter;
+        const auto& xval = GetView::LogicalValue(arr.GetView(x_index));
+        auto top_item = heap.top();
+        const auto& top_value =
+            GetView::LogicalValue(top_item.array->GetView(top_item.index));
+        if (comparator(xval, top_value)) {
+          heap.pop();
+          heap.push(HeapItem{x_index, offset, &arr});
+        }
+      }
+      offset += chunk->length();
+    }
+
+    auto out_size = static_cast<int64_t>(heap.size());
+    ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(), out_size,
+                                                                    ctx_->memory_pool()));
+    auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size - 1;
+    while (heap.size() > 0) {
+      auto top_item = heap.top();
+      *out_cbegin = top_item.index + top_item.offset;
+      heap.pop();
+      --out_cbegin;
+    }
+    *output_ = Datum(take_indices);
+    return Status::OK();
+  }
+
+  const ChunkedArray& chunked_array_;
+  const std::shared_ptr<DataType> physical_type_;
+  const ArrayVector physical_chunks_;
+  int64_t k_;
+  SortOrder order_;
+  ExecContext* ctx_;
+  Datum* output_;
+};
+
+class RecordBatchSelecter : public TypeVisitor {
+ private:
+  using ResolvedSortKey = MultipleKeyRecordBatchSorter::ResolvedSortKey;
+  using Comparator = MultipleKeyComparator<ResolvedSortKey>;
+
+ public:
+  RecordBatchSelecter(ExecContext* ctx, const RecordBatch& record_batch,
+                      const SelectKOptions& options, Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        record_batch_(record_batch),
+        k_(options.k),
+        output_(output),
+        sort_keys_(ResolveSortKeys(record_batch, options.sort_keys)),
+        comparator_(sort_keys_, NullPlacement::AtEnd) {}
+
+  Status Run() { return sort_keys_[0].type->Accept(this); }
+
+ protected:
+#define VISIT(TYPE)                                            \
+  Status Visit(const TYPE& type) {                             \
+    if (sort_keys_[0].order == SortOrder::Descending)          \
+      return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+    return SelectKthInternal<TYPE, SortOrder::Ascending>();    \
+  }
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+#undef VISIT
+
+  static std::vector<ResolvedSortKey> ResolveSortKeys(
+      const RecordBatch& batch, const std::vector<SortKey>& sort_keys) {
+    std::vector<ResolvedSortKey> resolved;
+    for (const auto& key : sort_keys) {
+      auto array = key.target.GetOne(batch).ValueOr(nullptr);
+      resolved.emplace_back(array, key.order);
+    }
+    return resolved;
+  }
+
+  template <typename InType, SortOrder sort_order>
+  Status SelectKthInternal() {
+    using GetView = GetViewType<InType>;
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+    auto& comparator = comparator_;
+    const auto& first_sort_key = sort_keys_[0];
+    const auto& arr = checked_cast<const ArrayType&>(first_sort_key.array);
+
+    const auto num_rows = record_batch_.num_rows();
+    if (num_rows == 0) {
+      return Status::OK();
+    }
+    if (k_ > record_batch_.num_rows()) {
+      k_ = record_batch_.num_rows();
+    }
+    std::function<bool(const uint64_t&, const uint64_t&)> cmp;
+    SelectKComparator<sort_order> select_k_comparator;
+    cmp = [&](const uint64_t& left, const uint64_t& right) -> bool {
+      const auto lval = GetView::LogicalValue(arr.GetView(left));
+      const auto rval = GetView::LogicalValue(arr.GetView(right));
+      if (lval == rval) {
+        // If the left value equals to the right value,
+        // we need to compare the second and following
+        // sort keys.
+        return comparator.Compare(left, right, 1);
+      }
+      return select_k_comparator(lval, rval);
+    };
+    using HeapContainer =
+        std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+
+    std::vector<uint64_t> indices(arr.length());
+    uint64_t* indices_begin = indices.data();
+    uint64_t* indices_end = indices_begin + indices.size();
+    std::iota(indices_begin, indices_end, 0);
+
+    const auto p = PartitionNulls<ArrayType, NonStablePartitioner>(
+        indices_begin, indices_end, arr, 0, NullPlacement::AtEnd);
+    const auto end_iter = p.non_nulls_end;
+
+    auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+    HeapContainer heap(indices_begin, kth_begin, cmp);
+    for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+      uint64_t x_index = *iter;
+      auto top_item = heap.top();
+      if (cmp(x_index, top_item)) {
+        heap.pop();
+        heap.push(x_index);
+      }
+    }
+    auto out_size = static_cast<int64_t>(heap.size());
+    ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(), out_size,
+                                                                    ctx_->memory_pool()));
+    auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size - 1;
+    while (heap.size() > 0) {
+      *out_cbegin = heap.top();
+      heap.pop();
+      --out_cbegin;
+    }
+    *output_ = Datum(take_indices);
+    return Status::OK();
+  }
+
+  ExecContext* ctx_;
+  const RecordBatch& record_batch_;
+  int64_t k_;
+  Datum* output_;
+  std::vector<ResolvedSortKey> sort_keys_;
+  Comparator comparator_;
+};
+
+class TableSelecter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const std::shared_ptr<ChunkedArray>& chunked_array,
+                    const SortOrder order)
+        : order(order),
+          type(GetPhysicalType(chunked_array->type())),
+          chunks(GetPhysicalChunks(*chunked_array, type)),
+          null_count(chunked_array->null_count()),
+          resolver(GetArrayPointers(chunks)) {}
+
+    using LocationType = int64_t;
+
+    // Find the target chunk and index in the target chunk from an
+    // index in chunked array.
+    template <typename ArrayType>
+    ResolvedChunk<ArrayType> GetChunk(int64_t index) const {
+      return resolver.Resolve<ArrayType>(index);
+    }
+
+    const SortOrder order;
+    const std::shared_ptr<DataType> type;
+    const ArrayVector chunks;
+    const int64_t null_count;
+    const ChunkedArrayResolver resolver;
+  };
+  using Comparator = MultipleKeyComparator<ResolvedSortKey>;
+
+ public:
+  TableSelecter(ExecContext* ctx, const Table& table, const SelectKOptions& options,
+                Datum* output)
+      : TypeVisitor(),
+        ctx_(ctx),
+        table_(table),
+        k_(options.k),
+        output_(output),
+        sort_keys_(ResolveSortKeys(table, options.sort_keys)),
+        comparator_(sort_keys_, NullPlacement::AtEnd) {}
+
+  Status Run() { return sort_keys_[0].type->Accept(this); }
+
+ protected:
+#define VISIT(TYPE)                                            \
+  Status Visit(const TYPE& type) {                             \
+    if (sort_keys_[0].order == SortOrder::Descending)          \
+      return SelectKthInternal<TYPE, SortOrder::Descending>(); \
+    return SelectKthInternal<TYPE, SortOrder::Ascending>();    \
+  }
+  VISIT_SORTABLE_PHYSICAL_TYPES(VISIT)
+
+#undef VISIT
+
+  static std::vector<ResolvedSortKey> ResolveSortKeys(
+      const Table& table, const std::vector<SortKey>& sort_keys) {
+    std::vector<ResolvedSortKey> resolved;
+    for (const auto& key : sort_keys) {
+      auto chunked_array = GetTableColumn(table, key.target);
+      resolved.emplace_back(chunked_array, key.order);
+    }
+    return resolved;
+  }
+
+  // Behaves like PartitionNulls() but this supports multiple sort keys.
+  template <typename Type>
+  NullPartitionResult PartitionNullsInternal(uint64_t* indices_begin,
+                                             uint64_t* indices_end,
+                                             const ResolvedSortKey& first_sort_key) {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+
+    const auto p = PartitionNullsOnly<StablePartitioner>(
+        indices_begin, indices_end, first_sort_key.resolver, first_sort_key.null_count,
+        NullPlacement::AtEnd);
+    DCHECK_EQ(p.nulls_end - p.nulls_begin, first_sort_key.null_count);
+
+    const auto q = PartitionNullLikes<ArrayType, StablePartitioner>(
+        p.non_nulls_begin, p.non_nulls_end, first_sort_key.resolver,
+        NullPlacement::AtEnd);
+
+    auto& comparator = comparator_;
+    // Sort all NaNs by the second and following sort keys.
+    std::stable_sort(q.nulls_begin, q.nulls_end, [&](uint64_t left, uint64_t right) {
+      return comparator.Compare(left, right, 1);
+    });
+    // Sort all nulls by the second and following sort keys.
+    std::stable_sort(p.nulls_begin, p.nulls_end, [&](uint64_t left, uint64_t right) {
+      return comparator.Compare(left, right, 1);
+    });
+
+    return q;
+  }
+
+  // XXX this implementation is rather inefficient as it computes chunk indices
+  // at every comparison.  Instead we should iterate over individual batches
+  // and remember ChunkLocation entries in the max-heap.
+
+  template <typename InType, SortOrder sort_order>
+  Status SelectKthInternal() {
+    using ArrayType = typename TypeTraits<InType>::ArrayType;
+    auto& comparator = comparator_;
+    const auto& first_sort_key = sort_keys_[0];
+
+    const auto num_rows = table_.num_rows();
+    if (num_rows == 0) {
+      return Status::OK();
+    }
+    if (k_ > table_.num_rows()) {
+      k_ = table_.num_rows();
+    }
+    std::function<bool(const uint64_t&, const uint64_t&)> cmp;
+    SelectKComparator<sort_order> select_k_comparator;
+    cmp = [&](const uint64_t& left, const uint64_t& right) -> bool {
+      auto chunk_left = first_sort_key.template GetChunk<ArrayType>(left);
+      auto chunk_right = first_sort_key.template GetChunk<ArrayType>(right);
+      auto value_left = chunk_left.Value();
+      auto value_right = chunk_right.Value();
+      if (value_left == value_right) {
+        return comparator.Compare(left, right, 1);
+      }
+      return select_k_comparator(value_left, value_right);
+    };
+    using HeapContainer =
+        std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
+
+    std::vector<uint64_t> indices(num_rows);
+    uint64_t* indices_begin = indices.data();
+    uint64_t* indices_end = indices_begin + indices.size();
+    std::iota(indices_begin, indices_end, 0);
+
+    const auto p =
+        this->PartitionNullsInternal<InType>(indices_begin, indices_end, first_sort_key);
+    const auto end_iter = p.non_nulls_end;
+    auto kth_begin = std::min(indices_begin + k_, end_iter);
+
+    HeapContainer heap(indices_begin, kth_begin, cmp);
+    for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
+      uint64_t x_index = *iter;
+      uint64_t top_item = heap.top();
+      if (cmp(x_index, top_item)) {
+        heap.pop();
+        heap.push(x_index);
+      }
+    }
+    auto out_size = static_cast<int64_t>(heap.size());
+    ARROW_ASSIGN_OR_RAISE(auto take_indices, MakeMutableUInt64Array(uint64(), out_size,
+                                                                    ctx_->memory_pool()));
+    auto* out_cbegin = take_indices->GetMutableValues<uint64_t>(1) + out_size - 1;
+    while (heap.size() > 0) {
+      *out_cbegin = heap.top();
+      heap.pop();
+      --out_cbegin;
+    }
+    *output_ = Datum(take_indices);
+    return Status::OK();
+  }
+
+  ExecContext* ctx_;
+  const Table& table_;
+  int64_t k_;
+  Datum* output_;
+  std::vector<ResolvedSortKey> sort_keys_;
+  Comparator comparator_;
+};
+
+static Status CheckConsistency(const Schema& schema,
+                               const std::vector<SortKey>& sort_keys) {
+  for (const auto& key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(key.target));
+    RETURN_NOT_OK(PrependInvalidColumn(key.target.FindOne(schema)));
+  }
+  return Status::OK();
+}
+
+class SelectKUnstableMetaFunction : public MetaFunction {
+ public:
+  SelectKUnstableMetaFunction()
+      : MetaFunction("select_k_unstable", Arity::Unary(), select_k_unstable_doc,
+                     GetDefaultSelectKOptions()) {}
+
+  Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
+                            const FunctionOptions* options,
+                            ExecContext* ctx) const override {
+    const auto& select_k_options = static_cast<const SelectKOptions&>(*options);
+    if (select_k_options.k < 0) {
+      return Status::Invalid("select_k_unstable requires a nonnegative `k`, got ",
+                             select_k_options.k);
+    }
+    if (select_k_options.sort_keys.size() == 0) {
+      return Status::Invalid("select_k_unstable requires a non-empty `sort_keys`");
+    }
+    switch (args[0].kind()) {
+      case Datum::ARRAY:
+        return SelectKth(*args[0].make_array(), select_k_options, ctx);
+      case Datum::CHUNKED_ARRAY:
+        return SelectKth(*args[0].chunked_array(), select_k_options, ctx);
+      case Datum::RECORD_BATCH:
+        return SelectKth(*args[0].record_batch(), select_k_options, ctx);
+      case Datum::TABLE:
+        return SelectKth(*args[0].table(), select_k_options, ctx);
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for select_k operation: "
+        "values=",
+        args[0].ToString());
+  }
+
+ private:
+  Result<Datum> SelectKth(const Array& array, const SelectKOptions& options,
+                          ExecContext* ctx) const {
+    Datum output;
+    ArraySelecter selecter(ctx, array, options, &output);
+    ARROW_RETURN_NOT_OK(selecter.Run());
+    return output;
+  }
+
+  Result<Datum> SelectKth(const ChunkedArray& chunked_array,
+                          const SelectKOptions& options, ExecContext* ctx) const {
+    Datum output;
+    ChunkedArraySelecter selecter(ctx, chunked_array, options, &output);
+    ARROW_RETURN_NOT_OK(selecter.Run());
+    return output;
+  }
+  Result<Datum> SelectKth(const RecordBatch& record_batch, const SelectKOptions& options,
+                          ExecContext* ctx) const {
+    ARROW_RETURN_NOT_OK(CheckConsistency(*record_batch.schema(), options.sort_keys));
+    Datum output;
+    RecordBatchSelecter selecter(ctx, record_batch, options, &output);
+    ARROW_RETURN_NOT_OK(selecter.Run());
+    return output;
+  }
+  Result<Datum> SelectKth(const Table& table, const SelectKOptions& options,
+                          ExecContext* ctx) const {
+    ARROW_RETURN_NOT_OK(CheckConsistency(*table.schema(), options.sort_keys));
+    Datum output;
+    TableSelecter selecter(ctx, table, options, &output);
+    ARROW_RETURN_NOT_OK(selecter.Run());
+    return output;
+  }
+};
+
+// ----------------------------------------------------------------------
+// Rank implementation

Review Comment:
   Since you're moving the Rank implementation(s) already, might as well move them to a dedicated `vector_rank.cc`?



##########
cpp/src/arrow/compute/kernels/vector_sort_internal.h:
##########
@@ -456,6 +458,402 @@ Status SortChunkedArray(ExecContext* ctx, uint64_t* indices_begin, uint64_t* ind
                         const ChunkedArray& values, SortOrder sort_order,
                         NullPlacement null_placement);
 
+// ----------------------------------------------------------------------
+// Helpers for Sort/SelectK implementations
+
+struct SortField {
+  int field_index;
+  SortOrder order;
+};
+
+inline Status CheckNonNested(const FieldRef& ref) {
+  if (ref.IsNested()) {
+    return Status::KeyError("Nested keys not supported for SortKeys");
+  }
+  return Status::OK();
+}
+
+template <typename T>
+Result<T> PrependInvalidColumn(Result<T> res) {
+  if (res.ok()) return res;
+  return res.status().WithMessage("Invalid sort key column: ", res.status().message());
+}
+
+// Return the field indices of the sort keys, deduplicating them along the way
+inline Result<std::vector<SortField>> FindSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys) {
+  std::vector<SortField> fields;
+  std::unordered_set<int> seen;
+  fields.reserve(sort_keys.size());
+  seen.reserve(sort_keys.size());
+
+  for (const auto& sort_key : sort_keys) {
+    RETURN_NOT_OK(CheckNonNested(sort_key.target));
+
+    ARROW_ASSIGN_OR_RAISE(auto match,
+                          PrependInvalidColumn(sort_key.target.FindOne(schema)));
+    if (seen.insert(match[0]).second) {
+      fields.push_back({match[0], sort_key.order});
+    }
+  }
+  return fields;
+}
+
+template <typename ResolvedSortKey, typename ResolvedSortKeyFactory>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const Schema& schema, const std::vector<SortKey>& sort_keys,
+    ResolvedSortKeyFactory&& factory) {
+  ARROW_ASSIGN_OR_RAISE(const auto fields, FindSortKeys(schema, sort_keys));
+  std::vector<ResolvedSortKey> resolved;
+  resolved.reserve(fields.size());
+  std::transform(fields.begin(), fields.end(), std::back_inserter(resolved), factory);
+  return resolved;
+}
+
+template <typename ResolvedSortKey, typename TableOrBatch>
+Result<std::vector<ResolvedSortKey>> ResolveSortKeys(
+    const TableOrBatch& table_or_batch, const std::vector<SortKey>& sort_keys) {
+  return ResolveSortKeys<ResolvedSortKey>(
+      *table_or_batch.schema(), sort_keys, [&](const SortField& f) {
+        return ResolvedSortKey{table_or_batch.column(f.field_index), f.order};
+      });
+}
+
+// Returns nullptr if no column matching `ref` is found, or if the FieldRef is
+// a nested reference.
+inline std::shared_ptr<ChunkedArray> GetTableColumn(const Table& table,

Review Comment:
   Hmm... `FieldPath` and `FieldRef` already support looking up a column from a RecordBatch, I think it would be easy to add similar support for Table there.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14753: GH-33559: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1380365502

   * Closes: #33559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #14753: GH-33559: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1381527528

   Benchmark runs are scheduled for baseline = df2aa384eedc6dfd2a816f66e3ef8af1ecda3e8f and contender = 345f5e18a0252c298d7e0f09f787c63ea9ba8eaa. 345f5e18a0252c298d7e0f09f787c63ea9ba8eaa is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/954f278a036843789c3d44d0071f148f...2535f9393f094faaaa51cae958e2876a/)
   [Finished :arrow_down:1.29% :arrow_up:0.06%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/19d9be025a114580978aec5be3ff1530...ec5f0d5b816942189e0601b19ed79bf2/)
   [Finished :arrow_down:0.51% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/e027839b7c584255ad65d1cf60330b90...15ff2befaab2433289cabe76a8aeb1ea/)
   [Finished :arrow_down:4.49% :arrow_up:0.65%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/fd042d57fc7f4bb5a37aa51a52bfeec7...f254a4288f35418286dda1262173bdf9/)
   Buildkite builds:
   [Finished] [`345f5e18` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2164)
   [Finished] [`345f5e18` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2187)
   [Finished] [`345f5e18` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2159)
   [Finished] [`345f5e18` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2180)
   [Finished] [`df2aa384` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2163)
   [Finished] [`df2aa384` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2186)
   [Finished] [`df2aa384` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2158)
   [Finished] [`df2aa384` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2179)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14753: GH-33559: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1380365552

   :warning: GitHub issue #33559 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou merged pull request #14753: GH-33559: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
pitrou merged PR #14753:
URL: https://github.com/apache/arrow/pull/14753


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1375962480

   @pitrou After looking deeper, what I said earlier isn't entirely correctly. Nested fields are only accepted in a few cases where `FindSortKeys` isn't called.  I tightened those up in the latest commit. There were also some inconsistencies in error reporting for missing/invalid `FieldRef`s (particularly in the select-k module), so I attempted to fix those as well. 
   
   Also, due to the error message changes, one of the python tests is failing in the appveyor build. I figured I'd wait for your opinion before fixing it - but I suspect you'd agree that the new error diagnostics would be more informative.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1371273417

   Yeah, makes sense. I can save the FieldRef/Path stuff for a later date.
   
   > we would have to decide whether to take into account outer-level nulls or not
   
   That was a point of uncertainty for me as well. My implementation of `FieldPath::FindAll` for `Table`s has child fields inherit their parent's validity via flattening, but the existing versions just use their underlying `ArrayData` directly (most likely to avoid allocating a new `Array`?), which would raise even more questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1329449759

   https://issues.apache.org/jira/browse/ARROW-18395


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1349092288

   @pitrou Yes. In fact, I'm working on it right now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] benibus commented on pull request #14753: ARROW-18395: [C++] Move select-k implementation into separate module

Posted by GitBox <gi...@apache.org>.
benibus commented on PR #14753:
URL: https://github.com/apache/arrow/pull/14753#issuecomment-1349609917

   Not quite sure if you think exposing the one common `ResolvedSortKey` in the header would be worthwhile - since I basically just copied it into select-k's `RecordBatchSelector`. In practice, there are 3 unique `ResolvedSortKey` definitions across 4 classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org