You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/11 15:36:25 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #8612: ARROW-8199: [C++] Add support for multi-column sort indices on Table

pitrou commented on a change in pull request #8612:
URL: https://github.com/apache/arrow/pull/8612#discussion_r521405090



##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -58,6 +58,34 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions {
   static TakeOptions Defaults() { return BoundsCheck(); }
 };
 
+enum SortOrder {

Review comment:
       Make this a `enum class`?

##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -58,6 +58,34 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions {
   static TakeOptions Defaults() { return BoundsCheck(); }
 };
 
+enum SortOrder {
+  ASCENDING,
+  DESCENDING,

Review comment:
       Nit, but sometimes all-uppercase values conflict with some C macros (especially on Windows). `Ascending` perhaps?

##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -165,8 +194,50 @@ Result<std::shared_ptr<Array>> NthToIndices(const Array& values, int64_t n,
 /// \param[in] ctx the function execution context, optional
 /// \return offsets indices that would sort an array
 ARROW_EXPORT
-Result<std::shared_ptr<Array>> SortToIndices(const Array& values,
-                                             ExecContext* ctx = NULLPTR);
+Result<std::shared_ptr<Array>> SortIndices(const Array& values,
+                                           ExecContext* ctx = NULLPTR);
+
+/// \brief Returns the indices that would sort an array in the
+/// specified order.
+///
+/// Perform an indirect sort of array. The output array will contain
+/// indices that would sort an array, which would be the same length
+/// as input. Nulls will be stably partitioned to the end of the output.

Review comment:
       Add "regardless of order"?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;

Review comment:
       `int64_t`?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;
+    std::vector<Array*> chunks;
+  };
+
+  class Comparer : public TypeVisitor {
+   public:
+    Comparer(const Table& table, const std::vector<SortKey>& sort_keys)
+        : TypeVisitor(), status_(Status::OK()) {
+      for (const auto& sort_key : sort_keys) {
+        const auto& chunked_array = table.GetColumnByName(sort_key.name);
+        if (!chunked_array) {
+          status_ = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+          return;
+        }
+        sort_keys_.emplace_back(*chunked_array, sort_key.order);
+      }
+    }
+
+    Status status() { return status_; }
+
+    const std::vector<ResolvedSortKey>& sort_keys() { return sort_keys_; }
+
+    bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {
+      current_left_ = left;
+      current_right_ = right;
+      current_compared_ = 0;
+      auto num_sort_keys = sort_keys_.size();
+      for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+        current_sort_key_index_ = i;
+        status_ = sort_keys_[i].type->Accept(this);
+        if (current_compared_ != 0) {
+          break;
+        }
+      }
+      return current_compared_ < 0;
+    }
+
+#define VISIT(TYPE)                                \
+  Status Visit(const TYPE##Type& type) override {  \
+    current_compared_ = CompareType<TYPE##Type>(); \
+    return Status::OK();                           \
+  }
+
+    VISIT(Int8)
+    VISIT(Int16)
+    VISIT(Int32)
+    VISIT(Int64)
+    VISIT(UInt8)
+    VISIT(UInt16)
+    VISIT(UInt32)
+    VISIT(UInt64)
+    VISIT(Float)
+    VISIT(Double)
+    VISIT(String)
+    VISIT(Binary)
+    VISIT(LargeString)
+    VISIT(LargeBinary)
+
+#undef VISIT
+
+   private:
+    template <typename Type>
+    int32_t CompareType() {
+      using ArrayType = typename TypeTraits<Type>::ArrayType;
+      const auto& sort_key = sort_keys_[current_sort_key_index_];
+      auto order = sort_key.order;
+      int64_t index_left = 0;
+      auto array_left = sort_key.ResolveChunk<ArrayType>(current_left_, index_left);
+      int64_t index_right = 0;
+      auto array_right = sort_key.ResolveChunk<ArrayType>(current_right_, index_right);
+      if (sort_key.null_count > 0) {
+        auto is_null_left = array_left->IsNull(index_left);
+        auto is_null_right = array_right->IsNull(index_right);
+        if (is_null_left && is_null_right) {
+          return 0;
+        } else if (is_null_left) {
+          return 1;
+        } else if (is_null_right) {
+          return -1;
+        }
+      }
+      auto left = array_left->GetView(index_left);
+      auto right = array_right->GetView(index_right);
+      int32_t compared;
+      if (left == right) {
+        compared = 0;
+      } else if (left > right) {
+        compared = 1;
+      } else {
+        compared = -1;
+      }
+      if (order == SortOrder::DESCENDING) {
+        compared = -compared;
+      }
+      return compared;
+    }
+
+    Status status_;
+    std::vector<ResolvedSortKey> sort_keys_;
+    int64_t current_left_;
+    int64_t current_right_;
+    size_t current_sort_key_index_;
+    int32_t current_compared_;
+  };
+
+ public:
+  TableSorter(uint64_t* indices_begin, uint64_t* indices_end, const Table& table,
+              const SortOptions& options)
+      : indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        comparer_(table, options.sort_keys) {}
+
+  Status Sort() {
+    ARROW_RETURN_NOT_OK(comparer_.status());
+    return comparer_.sort_keys()[0].type->Accept(this);
+  }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE##Type& type) override { return SortInternal<TYPE##Type>(); }
+
+  VISIT(Int8)
+  VISIT(Int16)
+  VISIT(Int32)
+  VISIT(Int64)
+  VISIT(UInt8)
+  VISIT(UInt16)
+  VISIT(UInt32)
+  VISIT(UInt64)
+  VISIT(Float)
+  VISIT(Double)
+  VISIT(String)
+  VISIT(Binary)
+  VISIT(LargeString)
+  VISIT(LargeBinary)
+
+#undef VISIT
+
+ private:
+  template <typename Type>
+  Status SortInternal() {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    std::iota(indices_begin_, indices_end_, 0);
+
+    auto& comparer = comparer_;
+    const auto& first_sort_key = comparer.sort_keys()[0];
+    auto nulls_begin = indices_end_;
+    nulls_begin = PartitionNullsInternal<Type>(first_sort_key);
+    std::stable_sort(
+        indices_begin_, nulls_begin,
+        [&first_sort_key, &comparer](uint64_t left, uint64_t right) {
+          int64_t index_left = 0;
+          auto array_left = first_sort_key.ResolveChunk<ArrayType>(left, index_left);
+          int64_t index_right = 0;
+          auto array_right = first_sort_key.ResolveChunk<ArrayType>(right, index_right);
+          auto value_left = array_left->GetView(index_left);
+          auto value_right = array_right->GetView(index_right);
+          if (value_left == value_right) {
+            return comparer.Compare(left, right, 1);
+          } else {
+            auto compared = value_left < value_right;
+            if (first_sort_key.order == SortOrder::ASCENDING) {
+              return compared;
+            } else {
+              return !compared;
+            }
+          }
+        });
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_t<!is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
+      const ResolvedSortKey& first_sort_key) {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    if (first_sort_key.null_count == 0) {
+      return indices_end_;
+    }
+    StablePartitioner partitioner;
+    auto nulls_begin =
+        partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+          int64_t index_chunk = 0;
+          auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+          return !chunk->IsNull(index_chunk);
+        });
+    auto& comparer = comparer_;
+    std::stable_sort(nulls_begin, indices_end_,
+                     [&comparer](uint64_t left, uint64_t right) {
+                       return comparer.Compare(left, right, 1);
+                     });
+    return nulls_begin;
+  }
+
+  template <typename Type>
+  enable_if_t<is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
+      const ResolvedSortKey& first_sort_key) {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    StablePartitioner partitioner;
+    if (first_sort_key.null_count == 0) {
+      return partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+        int64_t index_chunk = 0;
+        auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+        return !std::isnan(chunk->GetView(index_chunk));
+      });
+    }
+    auto nans_and_nulls_begin =
+        partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+          int64_t index_chunk = 0;
+          auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+          return !chunk->IsNull(index_chunk) && !std::isnan(chunk->GetView(index_chunk));
+        });
+    auto nulls_begin = nans_and_nulls_begin;
+    if (first_sort_key.null_count < static_cast<int64_t>(indices_end_ - nulls_begin)) {
+      // move Nulls after NaN
+      nulls_begin = partitioner(
+          nans_and_nulls_begin, indices_end_, [&first_sort_key](uint64_t index) {
+            int64_t index_chunk = 0;
+            auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+            return !chunk->IsNull(index_chunk);
+          });
+    }
+    auto& comparer = comparer_;
+    if (nans_and_nulls_begin != nulls_begin) {
+      std::stable_sort(nans_and_nulls_begin, nulls_begin,
+                       [&comparer](uint64_t left, uint64_t right) {
+                         return comparer.Compare(left, right, 1);
+                       });
+    }
+    std::stable_sort(nulls_begin, indices_end_,
+                     [&comparer](uint64_t left, uint64_t right) {
+                       return comparer.Compare(left, right, 1);
+                     });
+    return nans_and_nulls_begin;
+  }
+
+  uint64_t* indices_begin_;
+  uint64_t* indices_end_;
+  Comparer comparer_;
+};
+
 const FunctionDoc sort_indices_doc(
+    "Return the indices that would sort an array, record batch or table",
+    ("This function computes an array of indices that define a stable sort\n"
+     "of the input array, record batch or table.  Null values are considered\n"
+     "greater than any other value and are therefore sorted at the end of the\n"
+     "input. For floating-point types, NaNs are considered greater than any\n"
+     "other non-null value, but smaller than null values."),
+    {"input"}, "SortOptions");
+
+class SortIndicesMetaFunction : public MetaFunction {
+ public:
+  SortIndicesMetaFunction()
+      : MetaFunction("sort_indices", Arity::Unary(), &sort_indices_doc) {}
+
+  Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
+                            const FunctionOptions* options,
+                            ExecContext* ctx) const override {
+    const SortOptions& sort_options = static_cast<const SortOptions&>(*options);
+    switch (args[0].kind()) {
+      case Datum::ARRAY:
+        return SortIndices(*args[0].make_array(), sort_options, ctx);
+        break;
+      case Datum::CHUNKED_ARRAY:
+        return SortIndices(*args[0].chunked_array(), sort_options, ctx);
+        break;
+      case Datum::RECORD_BATCH: {
+        ARROW_ASSIGN_OR_RAISE(auto table,
+                              Table::FromRecordBatches({args[0].record_batch()}));
+        return SortIndices(*table, sort_options, ctx);
+      } break;
+      case Datum::TABLE:
+        return SortIndices(*args[0].table(), sort_options, ctx);
+        break;
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for sort_indices operation: "
+        "values=",
+        args[0].ToString());
+  }
+
+ private:
+  Result<std::shared_ptr<Array>> SortIndices(const Array& values,
+                                             const SortOptions& options,
+                                             ExecContext* ctx) const {
+    SortOrder order = SortOrder::ASCENDING;
+    if (!options.sort_keys.empty()) {
+      order = options.sort_keys[0].order;
+    }
+    ArraySortOptions array_options(order);
+    ARROW_ASSIGN_OR_RAISE(
+        Datum result, CallFunction("array_sort_indices", {values}, &array_options, ctx));
+    return result.make_array();
+  }
+
+  Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& values,
+                                             const SortOptions& options,
+                                             ExecContext* ctx) const {
+    SortOrder order = SortOrder::ASCENDING;
+    if (!options.sort_keys.empty()) {
+      order = options.sort_keys[0].order;
+    }
+    ArraySortOptions array_options(order);
+
+    std::shared_ptr<Array> array_values;
+    if (values.num_chunks() == 1) {
+      array_values = values.chunk(0);
+    } else {
+      ARROW_ASSIGN_OR_RAISE(array_values,
+                            Concatenate(values.chunks(), ctx->memory_pool()));

Review comment:
       This is weird. Why are you concatenating chunks for the ChunkedArray case but not for the Table case? I would expect both cases to be handled similarly (regardless of the solution).

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;
+    std::vector<Array*> chunks;
+  };
+
+  class Comparer : public TypeVisitor {
+   public:
+    Comparer(const Table& table, const std::vector<SortKey>& sort_keys)
+        : TypeVisitor(), status_(Status::OK()) {
+      for (const auto& sort_key : sort_keys) {
+        const auto& chunked_array = table.GetColumnByName(sort_key.name);
+        if (!chunked_array) {
+          status_ = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+          return;
+        }
+        sort_keys_.emplace_back(*chunked_array, sort_key.order);
+      }
+    }
+
+    Status status() { return status_; }
+
+    const std::vector<ResolvedSortKey>& sort_keys() { return sort_keys_; }
+
+    bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {
+      current_left_ = left;
+      current_right_ = right;
+      current_compared_ = 0;
+      auto num_sort_keys = sort_keys_.size();
+      for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+        current_sort_key_index_ = i;
+        status_ = sort_keys_[i].type->Accept(this);
+        if (current_compared_ != 0) {
+          break;
+        }
+      }
+      return current_compared_ < 0;
+    }
+
+#define VISIT(TYPE)                                \
+  Status Visit(const TYPE##Type& type) override {  \
+    current_compared_ = CompareType<TYPE##Type>(); \
+    return Status::OK();                           \
+  }
+
+    VISIT(Int8)
+    VISIT(Int16)
+    VISIT(Int32)
+    VISIT(Int64)
+    VISIT(UInt8)
+    VISIT(UInt16)
+    VISIT(UInt32)
+    VISIT(UInt64)
+    VISIT(Float)
+    VISIT(Double)
+    VISIT(String)
+    VISIT(Binary)
+    VISIT(LargeString)
+    VISIT(LargeBinary)
+
+#undef VISIT
+
+   private:
+    template <typename Type>
+    int32_t CompareType() {
+      using ArrayType = typename TypeTraits<Type>::ArrayType;
+      const auto& sort_key = sort_keys_[current_sort_key_index_];
+      auto order = sort_key.order;
+      int64_t index_left = 0;
+      auto array_left = sort_key.ResolveChunk<ArrayType>(current_left_, index_left);
+      int64_t index_right = 0;
+      auto array_right = sort_key.ResolveChunk<ArrayType>(current_right_, index_right);
+      if (sort_key.null_count > 0) {
+        auto is_null_left = array_left->IsNull(index_left);
+        auto is_null_right = array_right->IsNull(index_right);
+        if (is_null_left && is_null_right) {
+          return 0;
+        } else if (is_null_left) {
+          return 1;
+        } else if (is_null_right) {
+          return -1;
+        }
+      }
+      auto left = array_left->GetView(index_left);
+      auto right = array_right->GetView(index_right);
+      int32_t compared;
+      if (left == right) {
+        compared = 0;
+      } else if (left > right) {
+        compared = 1;
+      } else {
+        compared = -1;
+      }
+      if (order == SortOrder::DESCENDING) {
+        compared = -compared;
+      }
+      return compared;
+    }
+
+    Status status_;
+    std::vector<ResolvedSortKey> sort_keys_;
+    int64_t current_left_;
+    int64_t current_right_;
+    size_t current_sort_key_index_;
+    int32_t current_compared_;
+  };
+
+ public:
+  TableSorter(uint64_t* indices_begin, uint64_t* indices_end, const Table& table,
+              const SortOptions& options)
+      : indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        comparer_(table, options.sort_keys) {}
+
+  Status Sort() {
+    ARROW_RETURN_NOT_OK(comparer_.status());
+    return comparer_.sort_keys()[0].type->Accept(this);
+  }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE##Type& type) override { return SortInternal<TYPE##Type>(); }
+
+  VISIT(Int8)
+  VISIT(Int16)
+  VISIT(Int32)
+  VISIT(Int64)
+  VISIT(UInt8)
+  VISIT(UInt16)
+  VISIT(UInt32)
+  VISIT(UInt64)
+  VISIT(Float)
+  VISIT(Double)
+  VISIT(String)
+  VISIT(Binary)
+  VISIT(LargeString)
+  VISIT(LargeBinary)
+
+#undef VISIT
+
+ private:
+  template <typename Type>
+  Status SortInternal() {

Review comment:
       Can you add comments when you define a non-trivial class or method?

##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -58,6 +58,34 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions {
   static TakeOptions Defaults() { return BoundsCheck(); }
 };
 
+enum SortOrder {
+  ASCENDING,
+  DESCENDING,
+};
+
+/// \brief One sort key for PartitionNthIndices (TODO) and SortIndices
+struct ARROW_EXPORT SortKey {
+  explicit SortKey(std::string name, SortOrder order = ASCENDING)
+      : name(name), order(order) {}
+
+  /// The name of the sort key.
+  std::string name;
+  /// How to order by this sort key.
+  SortOrder order;
+};
+
+struct ARROW_EXPORT ArraySortOptions : public FunctionOptions {
+  explicit ArraySortOptions(SortOrder order = SortOrder::ASCENDING) : order(order) {}
+
+  SortOrder order;
+};
+
+struct ARROW_EXPORT SortOptions : public FunctionOptions {

Review comment:
       Perhaps name this "TableSortOptions"?

##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -58,6 +58,34 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions {
   static TakeOptions Defaults() { return BoundsCheck(); }
 };
 
+enum SortOrder {
+  ASCENDING,
+  DESCENDING,
+};
+
+/// \brief One sort key for PartitionNthIndices (TODO) and SortIndices
+struct ARROW_EXPORT SortKey {
+  explicit SortKey(std::string name, SortOrder order = ASCENDING)
+      : name(name), order(order) {}
+
+  /// The name of the sort key.

Review comment:
       You mean sort column?

##########
File path: cpp/src/arrow/compute/api_vector.h
##########
@@ -165,8 +194,50 @@ Result<std::shared_ptr<Array>> NthToIndices(const Array& values, int64_t n,
 /// \param[in] ctx the function execution context, optional
 /// \return offsets indices that would sort an array
 ARROW_EXPORT
-Result<std::shared_ptr<Array>> SortToIndices(const Array& values,
-                                             ExecContext* ctx = NULLPTR);
+Result<std::shared_ptr<Array>> SortIndices(const Array& values,
+                                           ExecContext* ctx = NULLPTR);
+
+/// \brief Returns the indices that would sort an array in the
+/// specified order.
+///
+/// Perform an indirect sort of array. The output array will contain
+/// indices that would sort an array, which would be the same length
+/// as input. Nulls will be stably partitioned to the end of the output.
+///
+/// For example given values = [null, 1, 3.3, null, 2, 5.3] and order
+/// = SortOrder::DESCENDING, the output will be [5, 2, 4, 1, 0,
+/// 3].
+///
+/// \param[in] values array to sort
+/// \param[in] order ascending or descending
+/// \param[in] ctx the function execution context, optional
+/// \return offsets indices that would sort an array
+ARROW_EXPORT
+Result<std::shared_ptr<Array>> SortIndices(const Array& values, SortOrder order,

Review comment:
       Why not `order = SortOrder::Ascending`? This would avoid declaring two separate overloads.

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {

Review comment:
       Can you add a comment describing what this method does?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;
+    std::vector<Array*> chunks;
+  };
+
+  class Comparer : public TypeVisitor {
+   public:
+    Comparer(const Table& table, const std::vector<SortKey>& sort_keys)
+        : TypeVisitor(), status_(Status::OK()) {
+      for (const auto& sort_key : sort_keys) {
+        const auto& chunked_array = table.GetColumnByName(sort_key.name);
+        if (!chunked_array) {
+          status_ = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+          return;
+        }
+        sort_keys_.emplace_back(*chunked_array, sort_key.order);
+      }
+    }
+
+    Status status() { return status_; }
+
+    const std::vector<ResolvedSortKey>& sort_keys() { return sort_keys_; }
+
+    bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {

Review comment:
       Add a comment?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort_test.cc
##########
@@ -364,32 +400,264 @@ TYPED_TEST(TestSortToIndicesKernelRandomCount, SortRandomValuesCount) {
   int range = 2000;
   for (int test = 0; test < times; test++) {
     for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
-      auto array = rand.Generate(length, range, null_probability);
-      ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortToIndices(*array));
-      ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
-                                *checked_pointer_cast<UInt64Array>(offsets));
+      for (auto order : {SortOrder::ASCENDING, SortOrder::DESCENDING}) {
+        auto array = rand.Generate(length, range, null_probability);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortIndices(*array, order));
+        ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
+                                  *checked_pointer_cast<UInt64Array>(offsets), order);
+      }
     }
   }
 }
 
 // Long array with big value range: std::stable_sort
-TYPED_TEST_SUITE(TestSortToIndicesKernelRandomCompare, IntegralArrowTypes);
+TYPED_TEST_SUITE(TestArraySortIndicesKernelRandomCompare, IntegralArrowTypes);
 
-TYPED_TEST(TestSortToIndicesKernelRandomCompare, SortRandomValuesCompare) {
+TYPED_TEST(TestArraySortIndicesKernelRandomCompare, SortRandomValuesCompare) {
   using ArrayType = typename TypeTraits<TypeParam>::ArrayType;
 
   Random<TypeParam> rand(0x5487657);
   int times = 5;
   int length = 4000;
   for (int test = 0; test < times; test++) {
     for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
-      auto array = rand.Generate(length, null_probability);
-      ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortToIndices(*array));
-      ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
-                                *checked_pointer_cast<UInt64Array>(offsets));
+      for (auto order : {SortOrder::ASCENDING, SortOrder::DESCENDING}) {
+        auto array = rand.Generate(length, null_probability);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortIndices(*array, order));
+        ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
+                                  *checked_pointer_cast<UInt64Array>(offsets), order);
+      }
     }
   }
 }
 
+class TestTableSortIndices : public ::testing::Test {
+ protected:
+  void AssertSortIndices(const std::shared_ptr<Table> table, const SortOptions& options,
+                         const std::shared_ptr<Array> expected) {
+    ASSERT_OK_AND_ASSIGN(auto actual, SortIndices(*table, options));
+    AssertArraysEqual(*expected, *actual);
+  }
+
+  void AssertSortIndices(const std::shared_ptr<Table> table, const SortOptions& options,
+                         const std::string expected) {
+    AssertSortIndices(table, options, ArrayFromJSON(uint64(), expected));
+  }
+};
+
+TEST_F(TestTableSortIndices, SortNull) {
+  auto table = TableFromJSON(schema({
+                                 {field("a", uint8())},
+                                 {field("b", uint8())},
+                             }),
+                             {"["
+                              "{\"a\": null, \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 3},"
+                              "{\"a\": 3,    \"b\": null},"
+                              "{\"a\": null, \"b\": null},"
+                              "{\"a\": 2,    \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 5}"
+                              "]"});
+  SortOptions options(
+      {SortKey("a", SortOrder::ASCENDING), SortKey("b", SortOrder::DESCENDING)});
+  this->AssertSortIndices(table, options, "[5, 1, 4, 2, 0, 3]");
+}
+
+TEST_F(TestTableSortIndices, SortNaN) {
+  auto table = TableFromJSON(schema({
+                                 {field("a", float32())},
+                                 {field("b", float32())},
+                             }),
+                             {"["
+                              "{\"a\": null, \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 3},"
+                              "{\"a\": 3,    \"b\": null},"
+                              "{\"a\": null, \"b\": null},"
+                              "{\"a\": NaN,  \"b\": null},"
+                              "{\"a\": NaN,  \"b\": 5},"
+                              "{\"a\": NaN,  \"b\": NaN},"
+                              "{\"a\": 1,    \"b\": 5}"
+                              "]"});
+  SortOptions options(
+      {SortKey("a", SortOrder::ASCENDING), SortKey("b", SortOrder::DESCENDING)});
+  this->AssertSortIndices(table, options, "[7, 1, 2, 5, 6, 4, 0, 3]");
+}
+
+using RandomParam = std::tuple<std::string, double>;
+class TestTableSortIndicesRandom : public testing::TestWithParam<RandomParam> {
+  class Comparator : public TypeVisitor {
+   public:
+    bool operator()(const Table& table, const SortOptions& options, uint64_t lhs,
+                    uint64_t rhs) {
+      lhs_ = lhs;
+      rhs_ = rhs;
+      for (const auto& sort_key : options.sort_keys) {
+        auto chunked_array = table.GetColumnByName(sort_key.name);
+        lhs_array_ = findTargetArray(chunked_array, lhs, lhs_index_);
+        rhs_array_ = findTargetArray(chunked_array, rhs, rhs_index_);
+        if (rhs_array_->IsNull(rhs_index_) && lhs_array_->IsNull(lhs_index_)) continue;
+        if (rhs_array_->IsNull(rhs_index_)) return true;
+        if (lhs_array_->IsNull(lhs_index_)) return false;
+        status_ = lhs_array_->type()->Accept(this);
+        if (compared_ == 0) continue;
+        if (sort_key.order == SortOrder::ASCENDING) {
+          return compared_ < 0;
+        } else {
+          return compared_ > 0;
+        }
+      }
+      return lhs < rhs;
+    }
+
+    Status status() const { return status_; }
+
+#define VISIT(TYPE)                               \
+  Status Visit(const TYPE##Type& type) override { \
+    compared_ = CompareType<TYPE##Type>();        \
+    return Status::OK();                          \
+  }
+
+    VISIT(Int8)
+    VISIT(Int16)
+    VISIT(Int32)
+    VISIT(Int64)
+    VISIT(UInt8)
+    VISIT(UInt16)
+    VISIT(UInt32)
+    VISIT(UInt64)
+    VISIT(Float)
+    VISIT(Double)
+    VISIT(String)
+
+#undef VISIT
+
+   private:
+    std::shared_ptr<Array> findTargetArray(std::shared_ptr<ChunkedArray> chunked_array,

Review comment:
       I would expect `const Array* FindTargetArray(const ChunkedArray&, int64_t index, int64_t* chunk_index)`

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;
+    std::vector<Array*> chunks;
+  };
+
+  class Comparer : public TypeVisitor {
+   public:
+    Comparer(const Table& table, const std::vector<SortKey>& sort_keys)
+        : TypeVisitor(), status_(Status::OK()) {
+      for (const auto& sort_key : sort_keys) {
+        const auto& chunked_array = table.GetColumnByName(sort_key.name);
+        if (!chunked_array) {
+          status_ = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+          return;
+        }
+        sort_keys_.emplace_back(*chunked_array, sort_key.order);
+      }
+    }
+
+    Status status() { return status_; }
+
+    const std::vector<ResolvedSortKey>& sort_keys() { return sort_keys_; }
+
+    bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {
+      current_left_ = left;
+      current_right_ = right;
+      current_compared_ = 0;
+      auto num_sort_keys = sort_keys_.size();
+      for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+        current_sort_key_index_ = i;
+        status_ = sort_keys_[i].type->Accept(this);
+        if (current_compared_ != 0) {
+          break;
+        }
+      }
+      return current_compared_ < 0;
+    }
+
+#define VISIT(TYPE)                                \
+  Status Visit(const TYPE##Type& type) override {  \
+    current_compared_ = CompareType<TYPE##Type>(); \
+    return Status::OK();                           \
+  }
+
+    VISIT(Int8)
+    VISIT(Int16)
+    VISIT(Int32)
+    VISIT(Int64)
+    VISIT(UInt8)
+    VISIT(UInt16)
+    VISIT(UInt32)
+    VISIT(UInt64)
+    VISIT(Float)
+    VISIT(Double)
+    VISIT(String)
+    VISIT(Binary)
+    VISIT(LargeString)
+    VISIT(LargeBinary)
+
+#undef VISIT
+
+   private:
+    template <typename Type>
+    int32_t CompareType() {
+      using ArrayType = typename TypeTraits<Type>::ArrayType;
+      const auto& sort_key = sort_keys_[current_sort_key_index_];
+      auto order = sort_key.order;
+      int64_t index_left = 0;
+      auto array_left = sort_key.ResolveChunk<ArrayType>(current_left_, index_left);
+      int64_t index_right = 0;
+      auto array_right = sort_key.ResolveChunk<ArrayType>(current_right_, index_right);
+      if (sort_key.null_count > 0) {
+        auto is_null_left = array_left->IsNull(index_left);
+        auto is_null_right = array_right->IsNull(index_right);
+        if (is_null_left && is_null_right) {
+          return 0;
+        } else if (is_null_left) {
+          return 1;
+        } else if (is_null_right) {
+          return -1;
+        }
+      }
+      auto left = array_left->GetView(index_left);
+      auto right = array_right->GetView(index_right);
+      int32_t compared;
+      if (left == right) {
+        compared = 0;
+      } else if (left > right) {
+        compared = 1;
+      } else {
+        compared = -1;
+      }
+      if (order == SortOrder::DESCENDING) {
+        compared = -compared;
+      }
+      return compared;
+    }
+
+    Status status_;
+    std::vector<ResolvedSortKey> sort_keys_;
+    int64_t current_left_;
+    int64_t current_right_;
+    size_t current_sort_key_index_;
+    int32_t current_compared_;
+  };
+
+ public:
+  TableSorter(uint64_t* indices_begin, uint64_t* indices_end, const Table& table,
+              const SortOptions& options)
+      : indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        comparer_(table, options.sort_keys) {}
+
+  Status Sort() {
+    ARROW_RETURN_NOT_OK(comparer_.status());
+    return comparer_.sort_keys()[0].type->Accept(this);
+  }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE##Type& type) override { return SortInternal<TYPE##Type>(); }
+
+  VISIT(Int8)
+  VISIT(Int16)
+  VISIT(Int32)
+  VISIT(Int64)
+  VISIT(UInt8)
+  VISIT(UInt16)
+  VISIT(UInt32)
+  VISIT(UInt64)
+  VISIT(Float)
+  VISIT(Double)
+  VISIT(String)
+  VISIT(Binary)
+  VISIT(LargeString)
+  VISIT(LargeBinary)
+
+#undef VISIT
+
+ private:
+  template <typename Type>
+  Status SortInternal() {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    std::iota(indices_begin_, indices_end_, 0);
+
+    auto& comparer = comparer_;
+    const auto& first_sort_key = comparer.sort_keys()[0];
+    auto nulls_begin = indices_end_;
+    nulls_begin = PartitionNullsInternal<Type>(first_sort_key);
+    std::stable_sort(
+        indices_begin_, nulls_begin,
+        [&first_sort_key, &comparer](uint64_t left, uint64_t right) {
+          int64_t index_left = 0;
+          auto array_left = first_sort_key.ResolveChunk<ArrayType>(left, index_left);
+          int64_t index_right = 0;
+          auto array_right = first_sort_key.ResolveChunk<ArrayType>(right, index_right);
+          auto value_left = array_left->GetView(index_left);
+          auto value_right = array_right->GetView(index_right);
+          if (value_left == value_right) {
+            return comparer.Compare(left, right, 1);

Review comment:
       Performance will probably be very bad if there are lots of equal values in the first column, right?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort_test.cc
##########
@@ -228,134 +233,165 @@ TYPED_TEST(TestNthToIndicesRandom, RandomValues) {
 using arrow::internal::checked_pointer_cast;
 
 template <typename ArrowType>
-class TestSortToIndicesKernel : public TestBase {
+class TestArraySortIndicesKernel : public TestBase {
  private:
-  void AssertSortToIndicesArrays(const std::shared_ptr<Array> values,
-                                 const std::shared_ptr<Array> expected) {
-    ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, SortToIndices(*values));
+  void AssertArraysSortIndices(const std::shared_ptr<Array> values, SortOrder order,
+                               const std::shared_ptr<Array> expected) {
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> actual, SortIndices(*values, order));
     ASSERT_OK(actual->ValidateFull());
     AssertArraysEqual(*expected, *actual);
   }
 
  protected:
-  virtual void AssertSortToIndices(const std::string& values,
-                                   const std::string& expected) {
+  virtual void AssertSortIndices(const std::string& values, SortOrder order,
+                                 const std::string& expected) {
     auto type = TypeTraits<ArrowType>::type_singleton();
-    AssertSortToIndicesArrays(ArrayFromJSON(type, values),
-                              ArrayFromJSON(uint64(), expected));
+    AssertArraysSortIndices(ArrayFromJSON(type, values), order,
+                            ArrayFromJSON(uint64(), expected));
+  }
+
+  virtual void AssertSortIndices(const std::string& values, const std::string& expected) {
+    AssertSortIndices(values, SortOrder::ASCENDING, expected);
   }
 };
 
 template <typename ArrowType>
-class TestSortToIndicesKernelForReal : public TestSortToIndicesKernel<ArrowType> {};
-TYPED_TEST_SUITE(TestSortToIndicesKernelForReal, RealArrowTypes);
+class TestArraySortIndicesKernelForReal : public TestArraySortIndicesKernel<ArrowType> {};
+TYPED_TEST_SUITE(TestArraySortIndicesKernelForReal, RealArrowTypes);
 
 template <typename ArrowType>
-class TestSortToIndicesKernelForIntegral : public TestSortToIndicesKernel<ArrowType> {};
-TYPED_TEST_SUITE(TestSortToIndicesKernelForIntegral, IntegralArrowTypes);
+class TestArraySortIndicesKernelForIntegral
+    : public TestArraySortIndicesKernel<ArrowType> {};
+TYPED_TEST_SUITE(TestArraySortIndicesKernelForIntegral, IntegralArrowTypes);
 
 template <typename ArrowType>
-class TestSortToIndicesKernelForStrings : public TestSortToIndicesKernel<ArrowType> {};
-TYPED_TEST_SUITE(TestSortToIndicesKernelForStrings, testing::Types<StringType>);
+class TestArraySortIndicesKernelForStrings
+    : public TestArraySortIndicesKernel<ArrowType> {};
+TYPED_TEST_SUITE(TestArraySortIndicesKernelForStrings, testing::Types<StringType>);
+
+TYPED_TEST(TestArraySortIndicesKernelForReal, SortReal) {
+  this->AssertSortIndices("[]", "[]");
+
+  this->AssertSortIndices("[3.4, 2.6, 6.3]", "[1, 0, 2]");
+  this->AssertSortIndices("[1.1, 2.4, 3.5, 4.3, 5.1, 6.8, 7.3]", "[0,1,2,3,4,5,6]");

Review comment:
       Nit, but can you always use the same convention for spacing JSON values?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort_benchmark.cc
##########
@@ -56,17 +57,99 @@ static void SortToIndicesInt64Compare(benchmark::State& state) {
   auto max = std::numeric_limits<int64_t>::max();
   auto values = rand.Int64(array_size, min, max, args.null_proportion);
 
-  SortToIndicesBenchmark(state, values);
+  ArraySortIndicesBenchmark(state, values);
 }
 
-BENCHMARK(SortToIndicesInt64Count)
+static void TableSortIndicesBenchmark(benchmark::State& state,
+                                      const std::shared_ptr<Table>& table,
+                                      const SortOptions& options) {
+  for (auto _ : state) {
+    ABORT_NOT_OK(SortIndices(*table, options).status());
+  }
+  state.SetItemsProcessed(state.iterations() * table->num_rows());
+}
+
+static void TableSortIndicesInt64Count(benchmark::State& state) {
+  RegressionArgs args(state);
+
+  const int64_t array_size = args.size / sizeof(int64_t);
+  auto rand = random::RandomArrayGenerator(kSeed);
+  auto values = rand.Int64(array_size, -100, 100, args.null_proportion);
+  std::vector<std::shared_ptr<Field>> fields = {{field("int64", int64())}};
+  auto table = Table::Make(schema(fields), {values}, array_size);
+  SortOptions options({SortKey("int64", SortOrder::ASCENDING)});
+
+  TableSortIndicesBenchmark(state, table, options);
+}
+
+static void TableSortIndicesInt64Compare(benchmark::State& state) {
+  RegressionArgs args(state);
+
+  const int64_t array_size = args.size / sizeof(int64_t);
+  auto rand = random::RandomArrayGenerator(kSeed);
+
+  auto min = std::numeric_limits<int64_t>::min();
+  auto max = std::numeric_limits<int64_t>::max();
+  auto values = rand.Int64(array_size, min, max, args.null_proportion);
+  std::vector<std::shared_ptr<Field>> fields = {{field("int64", int64())}};
+  auto table = Table::Make(schema(fields), {values}, array_size);
+  SortOptions options({SortKey("int64", SortOrder::ASCENDING)});
+
+  TableSortIndicesBenchmark(state, table, options);
+}
+
+static void TableSortIndicesInt64Int64(benchmark::State& state) {
+  RegressionArgs args(state);
+
+  const int64_t array_size = args.size / sizeof(int64_t);
+  auto rand = random::RandomArrayGenerator(kSeed);
+
+  auto min = std::numeric_limits<int64_t>::min();
+  auto max = std::numeric_limits<int64_t>::max();

Review comment:
       You will probably get almost no duplicate values with this.

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {
+      if (num_chunks == 1) {
+        chunk_index = index;
+        return static_cast<ArrayType*>(chunks[0]);
+      } else {
+        int64_t offset = 0;
+        for (size_t i = 0; i < num_chunks; ++i) {
+          if (index < offset + chunks[i]->length()) {
+            chunk_index = index - offset;
+            return static_cast<ArrayType*>(chunks[i]);
+          }
+          offset += chunks[i]->length();
+        }
+        return nullptr;
+      }
+    }
+
+    SortOrder order;
+    DataType* type;
+    int64_t null_count;
+    size_t num_chunks;
+    std::vector<Array*> chunks;
+  };
+
+  class Comparer : public TypeVisitor {
+   public:
+    Comparer(const Table& table, const std::vector<SortKey>& sort_keys)
+        : TypeVisitor(), status_(Status::OK()) {
+      for (const auto& sort_key : sort_keys) {
+        const auto& chunked_array = table.GetColumnByName(sort_key.name);
+        if (!chunked_array) {
+          status_ = Status::Invalid("Nonexistent sort key column: ", sort_key.name);
+          return;
+        }
+        sort_keys_.emplace_back(*chunked_array, sort_key.order);
+      }
+    }
+
+    Status status() { return status_; }
+
+    const std::vector<ResolvedSortKey>& sort_keys() { return sort_keys_; }
+
+    bool Compare(uint64_t left, uint64_t right, size_t start_sort_key_index) {
+      current_left_ = left;
+      current_right_ = right;
+      current_compared_ = 0;
+      auto num_sort_keys = sort_keys_.size();
+      for (size_t i = start_sort_key_index; i < num_sort_keys; ++i) {
+        current_sort_key_index_ = i;
+        status_ = sort_keys_[i].type->Accept(this);
+        if (current_compared_ != 0) {
+          break;
+        }
+      }
+      return current_compared_ < 0;
+    }
+
+#define VISIT(TYPE)                                \
+  Status Visit(const TYPE##Type& type) override {  \
+    current_compared_ = CompareType<TYPE##Type>(); \
+    return Status::OK();                           \
+  }
+
+    VISIT(Int8)
+    VISIT(Int16)
+    VISIT(Int32)
+    VISIT(Int64)
+    VISIT(UInt8)
+    VISIT(UInt16)
+    VISIT(UInt32)
+    VISIT(UInt64)
+    VISIT(Float)
+    VISIT(Double)
+    VISIT(String)
+    VISIT(Binary)
+    VISIT(LargeString)
+    VISIT(LargeBinary)
+
+#undef VISIT
+
+   private:
+    template <typename Type>
+    int32_t CompareType() {
+      using ArrayType = typename TypeTraits<Type>::ArrayType;
+      const auto& sort_key = sort_keys_[current_sort_key_index_];
+      auto order = sort_key.order;
+      int64_t index_left = 0;
+      auto array_left = sort_key.ResolveChunk<ArrayType>(current_left_, index_left);
+      int64_t index_right = 0;
+      auto array_right = sort_key.ResolveChunk<ArrayType>(current_right_, index_right);
+      if (sort_key.null_count > 0) {
+        auto is_null_left = array_left->IsNull(index_left);
+        auto is_null_right = array_right->IsNull(index_right);
+        if (is_null_left && is_null_right) {
+          return 0;
+        } else if (is_null_left) {
+          return 1;
+        } else if (is_null_right) {
+          return -1;
+        }
+      }
+      auto left = array_left->GetView(index_left);
+      auto right = array_right->GetView(index_right);
+      int32_t compared;
+      if (left == right) {
+        compared = 0;
+      } else if (left > right) {
+        compared = 1;
+      } else {
+        compared = -1;
+      }
+      if (order == SortOrder::DESCENDING) {
+        compared = -compared;
+      }
+      return compared;
+    }
+
+    Status status_;
+    std::vector<ResolvedSortKey> sort_keys_;
+    int64_t current_left_;
+    int64_t current_right_;
+    size_t current_sort_key_index_;
+    int32_t current_compared_;
+  };
+
+ public:
+  TableSorter(uint64_t* indices_begin, uint64_t* indices_end, const Table& table,
+              const SortOptions& options)
+      : indices_begin_(indices_begin),
+        indices_end_(indices_end),
+        comparer_(table, options.sort_keys) {}
+
+  Status Sort() {
+    ARROW_RETURN_NOT_OK(comparer_.status());
+    return comparer_.sort_keys()[0].type->Accept(this);
+  }
+
+#define VISIT(TYPE) \
+  Status Visit(const TYPE##Type& type) override { return SortInternal<TYPE##Type>(); }
+
+  VISIT(Int8)
+  VISIT(Int16)
+  VISIT(Int32)
+  VISIT(Int64)
+  VISIT(UInt8)
+  VISIT(UInt16)
+  VISIT(UInt32)
+  VISIT(UInt64)
+  VISIT(Float)
+  VISIT(Double)
+  VISIT(String)
+  VISIT(Binary)
+  VISIT(LargeString)
+  VISIT(LargeBinary)
+
+#undef VISIT
+
+ private:
+  template <typename Type>
+  Status SortInternal() {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    std::iota(indices_begin_, indices_end_, 0);
+
+    auto& comparer = comparer_;
+    const auto& first_sort_key = comparer.sort_keys()[0];
+    auto nulls_begin = indices_end_;
+    nulls_begin = PartitionNullsInternal<Type>(first_sort_key);
+    std::stable_sort(
+        indices_begin_, nulls_begin,
+        [&first_sort_key, &comparer](uint64_t left, uint64_t right) {
+          int64_t index_left = 0;
+          auto array_left = first_sort_key.ResolveChunk<ArrayType>(left, index_left);
+          int64_t index_right = 0;
+          auto array_right = first_sort_key.ResolveChunk<ArrayType>(right, index_right);
+          auto value_left = array_left->GetView(index_left);
+          auto value_right = array_right->GetView(index_right);
+          if (value_left == value_right) {
+            return comparer.Compare(left, right, 1);
+          } else {
+            auto compared = value_left < value_right;
+            if (first_sort_key.order == SortOrder::ASCENDING) {
+              return compared;
+            } else {
+              return !compared;
+            }
+          }
+        });
+    return Status::OK();
+  }
+
+  template <typename Type>
+  enable_if_t<!is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
+      const ResolvedSortKey& first_sort_key) {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    if (first_sort_key.null_count == 0) {
+      return indices_end_;
+    }
+    StablePartitioner partitioner;
+    auto nulls_begin =
+        partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+          int64_t index_chunk = 0;
+          auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+          return !chunk->IsNull(index_chunk);
+        });
+    auto& comparer = comparer_;
+    std::stable_sort(nulls_begin, indices_end_,
+                     [&comparer](uint64_t left, uint64_t right) {
+                       return comparer.Compare(left, right, 1);
+                     });
+    return nulls_begin;
+  }
+
+  template <typename Type>
+  enable_if_t<is_floating_type<Type>::value, uint64_t*> PartitionNullsInternal(
+      const ResolvedSortKey& first_sort_key) {
+    using ArrayType = typename TypeTraits<Type>::ArrayType;
+    StablePartitioner partitioner;
+    if (first_sort_key.null_count == 0) {
+      return partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+        int64_t index_chunk = 0;
+        auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+        return !std::isnan(chunk->GetView(index_chunk));
+      });
+    }
+    auto nans_and_nulls_begin =
+        partitioner(indices_begin_, indices_end_, [&first_sort_key](uint64_t index) {
+          int64_t index_chunk = 0;
+          auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+          return !chunk->IsNull(index_chunk) && !std::isnan(chunk->GetView(index_chunk));
+        });
+    auto nulls_begin = nans_and_nulls_begin;
+    if (first_sort_key.null_count < static_cast<int64_t>(indices_end_ - nulls_begin)) {
+      // move Nulls after NaN
+      nulls_begin = partitioner(
+          nans_and_nulls_begin, indices_end_, [&first_sort_key](uint64_t index) {
+            int64_t index_chunk = 0;
+            auto chunk = first_sort_key.ResolveChunk<ArrayType>(index, index_chunk);
+            return !chunk->IsNull(index_chunk);
+          });
+    }
+    auto& comparer = comparer_;
+    if (nans_and_nulls_begin != nulls_begin) {
+      std::stable_sort(nans_and_nulls_begin, nulls_begin,
+                       [&comparer](uint64_t left, uint64_t right) {
+                         return comparer.Compare(left, right, 1);
+                       });
+    }
+    std::stable_sort(nulls_begin, indices_end_,
+                     [&comparer](uint64_t left, uint64_t right) {
+                       return comparer.Compare(left, right, 1);
+                     });
+    return nans_and_nulls_begin;
+  }
+
+  uint64_t* indices_begin_;
+  uint64_t* indices_end_;
+  Comparer comparer_;
+};
+
 const FunctionDoc sort_indices_doc(
+    "Return the indices that would sort an array, record batch or table",
+    ("This function computes an array of indices that define a stable sort\n"
+     "of the input array, record batch or table.  Null values are considered\n"
+     "greater than any other value and are therefore sorted at the end of the\n"
+     "input. For floating-point types, NaNs are considered greater than any\n"
+     "other non-null value, but smaller than null values."),
+    {"input"}, "SortOptions");
+
+class SortIndicesMetaFunction : public MetaFunction {
+ public:
+  SortIndicesMetaFunction()
+      : MetaFunction("sort_indices", Arity::Unary(), &sort_indices_doc) {}
+
+  Result<Datum> ExecuteImpl(const std::vector<Datum>& args,
+                            const FunctionOptions* options,
+                            ExecContext* ctx) const override {
+    const SortOptions& sort_options = static_cast<const SortOptions&>(*options);
+    switch (args[0].kind()) {
+      case Datum::ARRAY:
+        return SortIndices(*args[0].make_array(), sort_options, ctx);
+        break;
+      case Datum::CHUNKED_ARRAY:
+        return SortIndices(*args[0].chunked_array(), sort_options, ctx);
+        break;
+      case Datum::RECORD_BATCH: {
+        ARROW_ASSIGN_OR_RAISE(auto table,
+                              Table::FromRecordBatches({args[0].record_batch()}));
+        return SortIndices(*table, sort_options, ctx);
+      } break;
+      case Datum::TABLE:
+        return SortIndices(*args[0].table(), sort_options, ctx);
+        break;
+      default:
+        break;
+    }
+    return Status::NotImplemented(
+        "Unsupported types for sort_indices operation: "
+        "values=",
+        args[0].ToString());
+  }
+
+ private:
+  Result<std::shared_ptr<Array>> SortIndices(const Array& values,
+                                             const SortOptions& options,
+                                             ExecContext* ctx) const {
+    SortOrder order = SortOrder::ASCENDING;
+    if (!options.sort_keys.empty()) {
+      order = options.sort_keys[0].order;
+    }
+    ArraySortOptions array_options(order);
+    ARROW_ASSIGN_OR_RAISE(
+        Datum result, CallFunction("array_sort_indices", {values}, &array_options, ctx));
+    return result.make_array();
+  }
+
+  Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& values,
+                                             const SortOptions& options,
+                                             ExecContext* ctx) const {
+    SortOrder order = SortOrder::ASCENDING;
+    if (!options.sort_keys.empty()) {
+      order = options.sort_keys[0].order;
+    }
+    ArraySortOptions array_options(order);
+
+    std::shared_ptr<Array> array_values;
+    if (values.num_chunks() == 1) {
+      array_values = values.chunk(0);
+    } else {
+      ARROW_ASSIGN_OR_RAISE(array_values,
+                            Concatenate(values.chunks(), ctx->memory_pool()));
+    }
+    ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("array_sort_indices", {array_values},
+                                                     &array_options, ctx));
+    return result.make_array();
+  }
+
+  Result<Datum> SortIndices(const Table& table, const SortOptions& options,
+                            ExecContext* ctx) const {
+    auto n_sort_keys = options.sort_keys.size();
+    if (n_sort_keys == 0) {
+      return Status::Invalid("Must specify one or more sort keys");
+    }
+    if (n_sort_keys == 1) {
+      auto chunked_array = table.GetColumnByName(options.sort_keys[0].name);
+      if (!chunked_array) {
+        return Status::Invalid("Nonexistent sort key column: ",
+                               options.sort_keys[0].name);
+      }
+      return SortIndices(*chunked_array, options, ctx);
+    }
+
+    auto out_type = uint64();
+    auto length = table.num_rows();
+    auto buffer_size = BitUtil::BytesForBits(
+        length * std::static_pointer_cast<UInt64Type>(out_type)->bit_width());

Review comment:
       `sizeof(uint64_t)` perhaps?

##########
File path: cpp/src/arrow/compute/kernels/vector_sort_test.cc
##########
@@ -364,32 +400,264 @@ TYPED_TEST(TestSortToIndicesKernelRandomCount, SortRandomValuesCount) {
   int range = 2000;
   for (int test = 0; test < times; test++) {
     for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
-      auto array = rand.Generate(length, range, null_probability);
-      ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortToIndices(*array));
-      ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
-                                *checked_pointer_cast<UInt64Array>(offsets));
+      for (auto order : {SortOrder::ASCENDING, SortOrder::DESCENDING}) {
+        auto array = rand.Generate(length, range, null_probability);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortIndices(*array, order));
+        ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
+                                  *checked_pointer_cast<UInt64Array>(offsets), order);
+      }
     }
   }
 }
 
 // Long array with big value range: std::stable_sort
-TYPED_TEST_SUITE(TestSortToIndicesKernelRandomCompare, IntegralArrowTypes);
+TYPED_TEST_SUITE(TestArraySortIndicesKernelRandomCompare, IntegralArrowTypes);
 
-TYPED_TEST(TestSortToIndicesKernelRandomCompare, SortRandomValuesCompare) {
+TYPED_TEST(TestArraySortIndicesKernelRandomCompare, SortRandomValuesCompare) {
   using ArrayType = typename TypeTraits<TypeParam>::ArrayType;
 
   Random<TypeParam> rand(0x5487657);
   int times = 5;
   int length = 4000;
   for (int test = 0; test < times; test++) {
     for (auto null_probability : {0.0, 0.1, 0.5, 1.0}) {
-      auto array = rand.Generate(length, null_probability);
-      ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortToIndices(*array));
-      ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
-                                *checked_pointer_cast<UInt64Array>(offsets));
+      for (auto order : {SortOrder::ASCENDING, SortOrder::DESCENDING}) {
+        auto array = rand.Generate(length, null_probability);
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> offsets, SortIndices(*array, order));
+        ValidateSorted<ArrayType>(*checked_pointer_cast<ArrayType>(array),
+                                  *checked_pointer_cast<UInt64Array>(offsets), order);
+      }
     }
   }
 }
 
+class TestTableSortIndices : public ::testing::Test {
+ protected:
+  void AssertSortIndices(const std::shared_ptr<Table> table, const SortOptions& options,
+                         const std::shared_ptr<Array> expected) {
+    ASSERT_OK_AND_ASSIGN(auto actual, SortIndices(*table, options));
+    AssertArraysEqual(*expected, *actual);
+  }
+
+  void AssertSortIndices(const std::shared_ptr<Table> table, const SortOptions& options,
+                         const std::string expected) {
+    AssertSortIndices(table, options, ArrayFromJSON(uint64(), expected));
+  }
+};
+
+TEST_F(TestTableSortIndices, SortNull) {
+  auto table = TableFromJSON(schema({
+                                 {field("a", uint8())},
+                                 {field("b", uint8())},
+                             }),
+                             {"["
+                              "{\"a\": null, \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 3},"
+                              "{\"a\": 3,    \"b\": null},"
+                              "{\"a\": null, \"b\": null},"
+                              "{\"a\": 2,    \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 5}"
+                              "]"});
+  SortOptions options(
+      {SortKey("a", SortOrder::ASCENDING), SortKey("b", SortOrder::DESCENDING)});
+  this->AssertSortIndices(table, options, "[5, 1, 4, 2, 0, 3]");
+}
+
+TEST_F(TestTableSortIndices, SortNaN) {
+  auto table = TableFromJSON(schema({
+                                 {field("a", float32())},
+                                 {field("b", float32())},
+                             }),
+                             {"["
+                              "{\"a\": null, \"b\": 5},"
+                              "{\"a\": 1,    \"b\": 3},"
+                              "{\"a\": 3,    \"b\": null},"
+                              "{\"a\": null, \"b\": null},"
+                              "{\"a\": NaN,  \"b\": null},"
+                              "{\"a\": NaN,  \"b\": 5},"
+                              "{\"a\": NaN,  \"b\": NaN},"
+                              "{\"a\": 1,    \"b\": 5}"
+                              "]"});
+  SortOptions options(
+      {SortKey("a", SortOrder::ASCENDING), SortKey("b", SortOrder::DESCENDING)});
+  this->AssertSortIndices(table, options, "[7, 1, 2, 5, 6, 4, 0, 3]");
+}
+
+using RandomParam = std::tuple<std::string, double>;
+class TestTableSortIndicesRandom : public testing::TestWithParam<RandomParam> {
+  class Comparator : public TypeVisitor {

Review comment:
       Can you add comments when you define non-trivial classes and methods? This would help reading the code later.

##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -346,14 +374,396 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
   }
 }
 
+class TableSorter : public TypeVisitor {
+ private:
+  struct ResolvedSortKey {
+    ResolvedSortKey(const ChunkedArray& chunked_array, const SortOrder order)
+        : order(order) {
+      type = chunked_array.type().get();
+      null_count = chunked_array.null_count();
+      num_chunks = chunked_array.num_chunks();
+      for (const auto& chunk : chunked_array.chunks()) {
+        chunks.push_back(chunk.get());
+      }
+    }
+
+    template <typename ArrayType>
+    ArrayType* ResolveChunk(int64_t index, int64_t& chunk_index) const {

Review comment:
       Also, please make it `int64_t* chunk_index`. Out-parameters should be pointers.
   (you could also return a `std::pair` or a dedicated struct... anyway)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org