You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/14 16:28:02 UTC

[GitHub] [arrow] aocsa commented on a change in pull request #12582: ARROW-14183: [C++] Improve select_k_unstable performance

aocsa commented on a change in pull request #12582:
URL: https://github.com/apache/arrow/pull/12582#discussion_r826144290



##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -1734,105 +1768,155 @@ class TableSelecter : public TypeVisitor {
     return resolved;
   }
 
-  // Behaves like PartitionNulls() but this supports multiple sort keys.
-  template <typename Type>
-  NullPartitionResult PartitionNullsInternal(uint64_t* indices_begin,
-                                             uint64_t* indices_end,
-                                             const ResolvedSortKey& first_sort_key) {
-    using ArrayType = typename TypeTraits<Type>::ArrayType;
-
-    const auto p = PartitionNullsOnly<StablePartitioner>(
-        indices_begin, indices_end, first_sort_key.resolver, first_sort_key.null_count,
-        NullPlacement::AtEnd);
-    DCHECK_EQ(p.nulls_end - p.nulls_begin, first_sort_key.null_count);
-
-    const auto q = PartitionNullLikes<ArrayType, StablePartitioner>(
-        p.non_nulls_begin, p.non_nulls_end, first_sort_key.resolver,
-        NullPlacement::AtEnd);
-
-    auto& comparator = comparator_;
-    // Sort all NaNs by the second and following sort keys.
-    std::stable_sort(q.nulls_begin, q.nulls_end, [&](uint64_t left, uint64_t right) {
-      return comparator.Compare(left, right, 1);
-    });
-    // Sort all nulls by the second and following sort keys.
-    std::stable_sort(p.nulls_begin, p.nulls_end, [&](uint64_t left, uint64_t right) {
-      return comparator.Compare(left, right, 1);
-    });
-
-    return q;
-  }
-
-  // XXX this implementation is rather inefficient as it computes chunk indices
-  // at every comparison.  Instead we should iterate over individual batches
-  // and remember ChunkLocation entries in the max-heap.
-
   template <typename InType, SortOrder sort_order>
   Status SelectKthInternal() {
-    using ArrayType = typename TypeTraits<InType>::ArrayType;
-    auto& comparator = comparator_;
-    const auto& first_sort_key = sort_keys_[0];
-
     const auto num_rows = table_.num_rows();
     if (num_rows == 0) {
       return Status::OK();
     }
     if (k_ > table_.num_rows()) {
       k_ = table_.num_rows();
     }
-    std::function<bool(const uint64_t&, const uint64_t&)> cmp;
-    SelectKComparator<sort_order> select_k_comparator;
-    cmp = [&](const uint64_t& left, const uint64_t& right) -> bool {
-      auto chunk_left = first_sort_key.template GetChunk<ArrayType>(left);
-      auto chunk_right = first_sort_key.template GetChunk<ArrayType>(right);
-      auto value_left = chunk_left.Value();
-      auto value_right = chunk_right.Value();
-      if (value_left == value_right) {
-        return comparator.Compare(left, right, 1);
-      }
-      return select_k_comparator(value_left, value_right);
-    };
-    using HeapContainer =
-        std::priority_queue<uint64_t, std::vector<uint64_t>, decltype(cmp)>;
 
-    std::vector<uint64_t> indices(num_rows);
-    uint64_t* indices_begin = indices.data();
-    uint64_t* indices_end = indices_begin + indices.size();
+    SortOptions sort_options(options_.sort_keys, NullPlacement::AtEnd);
+    ARROW_ASSIGN_OR_RAISE(
+        auto row_indices,
+        MakeMutableUInt64Array(uint64(), table_.num_rows(), ctx_->memory_pool()));
+    auto indices_begin = row_indices->GetMutableValues<uint64_t>(1);
+    auto indices_end = indices_begin + table_.num_rows();
     std::iota(indices_begin, indices_end, 0);
 
-    const auto p =
-        this->PartitionNullsInternal<InType>(indices_begin, indices_end, first_sort_key);
-    const auto end_iter = p.non_nulls_end;
-    auto kth_begin = std::min(indices_begin + k_, end_iter);
+    RecordBatchVector batches;
+    {
+      TableBatchReader reader(table_);
+      RETURN_NOT_OK(reader.ReadAll(&batches));
+    }
+    const int64_t num_batches = static_cast<int64_t>(batches.size());
+    if (num_batches == 0) {
+      return Status::OK();
+    }
+    std::vector<NullPartitionResult> sorted(num_batches);
 
-    HeapContainer heap(indices_begin, kth_begin, cmp);
-    for (auto iter = kth_begin; iter != end_iter && !heap.empty(); ++iter) {
-      uint64_t x_index = *iter;
-      uint64_t top_item = heap.top();
-      if (cmp(x_index, top_item)) {
-        heap.pop();
-        heap.push(x_index);
-      }
+    // First sort all individual batches
+    int64_t begin_offset = 0;
+    int64_t end_offset = 0;
+    int64_t null_count = 0;
+    for (int64_t i = 0; i < num_batches; ++i) {
+      const auto& batch = *batches[i];
+      end_offset += batch.num_rows();
+      RadixRecordBatchSorter sorter(indices_begin + begin_offset,
+                                    indices_begin + end_offset, batch, sort_options);
+      ARROW_ASSIGN_OR_RAISE(sorted[i], sorter.Sort(begin_offset));
+      DCHECK_EQ(sorted[i].overall_begin(), indices_begin + begin_offset);
+      DCHECK_EQ(sorted[i].overall_end(), indices_begin + end_offset);
+      DCHECK_EQ(sorted[i].non_null_count() + sorted[i].null_count(), batch.num_rows());
+      begin_offset = end_offset;
+      // XXX this is an upper bound on the true null count

Review comment:
       Remove XXX

##########
File path: cpp/src/arrow/compute/kernels/chunked_internal.h
##########
@@ -144,6 +144,19 @@ struct ChunkedArrayResolver : protected ChunkResolver {
         checked_cast<const ArrayType*>(chunks_[loc.chunk_index]), loc.index_in_chunk);
   }
 
+  template <typename ArrayType>
+  ResolvedChunk<ArrayType> Resolve(ChunkLocation loc) const {
+    return ResolvedChunk<ArrayType>(
+        checked_cast<const ArrayType*>(chunks_[loc.chunk_index]), loc.index_in_chunk);
+  }
+
+  template <typename ArrayType>

Review comment:
       I would suggest to add some documentation to these internal but exposes functions (Resolve, ResolveChunkLocation). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org