You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/06/14 03:06:06 UTC
[arrow] branch master updated: ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5b859fda1a ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)
5b859fda1a is described below
commit 5b859fda1ae5baec20a3ef54c279d9915e66dd98
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jun 13 17:05:59 2022 -1000
ARROW-16590: [C++] Consolidate files dealing with row-major storage (#13218)
The primary goal of this refactor of old code was to improve the readability and clarity of the code base. I did not make any functional changes to the code and if any functional changes are suggested which modify existing code I will happily discuss them here but defer the changes themselves to follow-up PRs. I would very much appreciate any feedback on naming, making sure we have sufficient test coverage, and overall layout of the code.
* KeyRowArray -> RowTableImpl KeyEncoder -> RowTableEncoder: The old name made sense because this data is currently represented physically as an array of rows. However, the data is conceptually tabular. We are storing rows & columns. In particular, I found it confusing that `KeyColumnArray` was a 1D data structure while `KeyRowArray` was a 2D table structure.
* KeyEncoder::Context -> LightContext: There's nothing particular to the key encoder here and I worry keeping it there may lead to fracturing into many different "context" objects.
* Overall structure: I created a new folder arrow/compute/row and put all row-based utilities in here. Most of the files are now marked as _internal and the content in these files is not used outside of arrow/compute/row. The grouper had previously been alongside the kernel code and it didn't really belong there as it relies very heavily on the internal structure of the row encoding.
* Row structure: I documented the file arrow/compute/row/row_internal.h
Lead-authored-by: Weston Pace <we...@gmail.com>
Co-authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/CMakeLists.txt | 13 +-
cpp/src/arrow/compute/CMakeLists.txt | 1 +
cpp/src/arrow/compute/api.h | 6 +
cpp/src/arrow/compute/api_aggregate.h | 85 --
cpp/src/arrow/compute/exec/aggregate.cc | 239 ++++
cpp/src/arrow/compute/exec/aggregate.h | 60 +
cpp/src/arrow/compute/exec/aggregate_node.cc | 25 +-
cpp/src/arrow/compute/exec/hash_join.cc | 3 +-
cpp/src/arrow/compute/exec/key_encode.h | 502 ---------
cpp/src/arrow/compute/exec/key_hash.cc | 10 +-
cpp/src/arrow/compute/exec/key_hash.h | 10 +-
cpp/src/arrow/compute/exec/schema_util.h | 4 +-
.../arrow/compute/kernels/aggregate_benchmark.cc | 1 +
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 748 +------------
.../arrow/compute/kernels/hash_aggregate_test.cc | 57 +-
cpp/src/arrow/compute/light_array.h | 14 +
cpp/src/arrow/compute/row/CMakeLists.txt | 21 +
.../key_compare.cc => row/compare_internal.cc} | 50 +-
.../{exec/key_compare.h => row/compare_internal.h} | 94 +-
.../compare_internal_avx2.cc} | 33 +-
.../{exec/key_encode.cc => row/encode_internal.cc} | 1151 +++++++-------------
cpp/src/arrow/compute/row/encode_internal.h | 323 ++++++
.../encode_internal_avx2.cc} | 45 +-
cpp/src/arrow/compute/row/grouper.cc | 590 ++++++++++
cpp/src/arrow/compute/row/grouper.h | 112 ++
cpp/src/arrow/compute/row/row_internal.cc | 409 +++++++
cpp/src/arrow/compute/row/row_internal.h | 250 +++++
cpp/src/arrow/dataset/partition.cc | 9 +-
python/pyarrow/includes/libarrow.pxd | 2 +-
29 files changed, 2574 insertions(+), 2293 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index fd2f10db2f..6e348aaf43 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -380,6 +380,7 @@ if(ARROW_COMPUTE)
compute/api_vector.cc
compute/cast.cc
compute/exec.cc
+ compute/exec/aggregate.cc
compute/exec/aggregate_node.cc
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
@@ -389,8 +390,6 @@ if(ARROW_COMPUTE)
compute/exec/hash_join_dict.cc
compute/exec/hash_join_node.cc
compute/exec/ir_consumer.cc
- compute/exec/key_compare.cc
- compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/order_by_impl.cc
@@ -442,17 +441,21 @@ if(ARROW_COMPUTE)
compute/kernels/vector_nested.cc
compute/kernels/vector_replace.cc
compute/kernels/vector_selection.cc
- compute/kernels/vector_sort.cc)
+ compute/kernels/vector_sort.cc
+ compute/row/encode_internal.cc
+ compute/row/compare_internal.cc
+ compute/row/grouper.cc
+ compute/row/row_internal.cc)
append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)
append_avx2_src(compute/exec/bloom_filter_avx2.cc)
- append_avx2_src(compute/exec/key_compare_avx2.cc)
- append_avx2_src(compute/exec/key_encode_avx2.cc)
append_avx2_src(compute/exec/key_hash_avx2.cc)
append_avx2_src(compute/exec/key_map_avx2.cc)
append_avx2_src(compute/exec/util_avx2.cc)
+ append_avx2_src(compute/row/compare_internal_avx2.cc)
+ append_avx2_src(compute/row/encode_internal_avx2.cc)
list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc)
endif()
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 27e693d9a3..91fa796f6d 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -71,3 +71,4 @@ add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
add_subdirectory(kernels)
add_subdirectory(exec)
+add_subdirectory(row)
diff --git a/cpp/src/arrow/compute/api.h b/cpp/src/arrow/compute/api.h
index c8e0c2ee23..80582e47b7 100644
--- a/cpp/src/arrow/compute/api.h
+++ b/cpp/src/arrow/compute/api.h
@@ -46,3 +46,9 @@
/// @}
#include "arrow/compute/exec/options.h" // IWYU pragma: export
+
+/// \defgroup execnode-row Utilities for working with data in a row-major format
+/// @{
+/// @}
+
+#include "arrow/compute/row/grouper.h" // IWYU pragma: export
diff --git a/cpp/src/arrow/compute/api_aggregate.h b/cpp/src/arrow/compute/api_aggregate.h
index 1255a57ff1..7977889bf7 100644
--- a/cpp/src/arrow/compute/api_aggregate.h
+++ b/cpp/src/arrow/compute/api_aggregate.h
@@ -395,84 +395,6 @@ Result<Datum> Index(const Datum& value, const IndexOptions& options,
namespace internal {
-/// Internal use only: streaming group identifier.
-/// Consumes batches of keys and yields batches of the group ids.
-class ARROW_EXPORT Grouper {
- public:
- virtual ~Grouper() = default;
-
- /// Construct a Grouper which receives the specified key types
- static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
- ExecContext* ctx = default_exec_context());
-
- /// Consume a batch of keys, producing the corresponding group ids as an integer array.
- /// Currently only uint32 indices will be produced, eventually the bit width will only
- /// be as wide as necessary.
- virtual Result<Datum> Consume(const ExecBatch& batch) = 0;
-
- /// Get current unique keys. May be called multiple times.
- virtual Result<ExecBatch> GetUniques() = 0;
-
- /// Get the current number of groups.
- virtual uint32_t num_groups() const = 0;
-
- /// \brief Assemble lists of indices of identical elements.
- ///
- /// \param[in] ids An unsigned, all-valid integral array which will be
- /// used as grouping criteria.
- /// \param[in] num_groups An upper bound for the elements of ids
- /// \return A num_groups-long ListArray where the slot at i contains a
- /// list of indices where i appears in ids.
- ///
- /// MakeGroupings([
- /// 2,
- /// 2,
- /// 5,
- /// 5,
- /// 2,
- /// 3
- /// ], 8) == [
- /// [],
- /// [],
- /// [0, 1, 4],
- /// [5],
- /// [],
- /// [2, 3],
- /// [],
- /// []
- /// ]
- static Result<std::shared_ptr<ListArray>> MakeGroupings(
- const UInt32Array& ids, uint32_t num_groups,
- ExecContext* ctx = default_exec_context());
-
- /// \brief Produce a ListArray whose slots are selections of `array` which correspond to
- /// the provided groupings.
- ///
- /// For example,
- /// ApplyGroupings([
- /// [],
- /// [],
- /// [0, 1, 4],
- /// [5],
- /// [],
- /// [2, 3],
- /// [],
- /// []
- /// ], [2, 2, 5, 5, 2, 3]) == [
- /// [],
- /// [],
- /// [2, 2, 2],
- /// [3],
- /// [],
- /// [5, 5],
- /// [],
- /// []
- /// ]
- static Result<std::shared_ptr<ListArray>> ApplyGroupings(
- const ListArray& groupings, const Array& array,
- ExecContext* ctx = default_exec_context());
-};
-
/// \brief Configure a grouped aggregation
struct ARROW_EXPORT Aggregate {
/// the name of the aggregation function
@@ -482,13 +404,6 @@ struct ARROW_EXPORT Aggregate {
const FunctionOptions* options;
};
-/// Internal use only: helper function for testing HashAggregateKernels.
-/// This will be replaced by streaming execution operators.
-ARROW_EXPORT
-Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
- const std::vector<Aggregate>& aggregates, bool use_threads = false,
- ExecContext* ctx = default_exec_context());
-
} // namespace internal
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate.cc b/cpp/src/arrow/compute/exec/aggregate.cc
new file mode 100644
index 0000000000..1a81fdac3d
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/aggregate.cc
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/exec/aggregate.h"
+
+#include <mutex>
+
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
+#include "arrow/util/task_group.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+ ExecContext* ctx, const std::vector<Aggregate>& aggregates,
+ const std::vector<ValueDescr>& in_descrs) {
+ if (aggregates.size() != in_descrs.size()) {
+ return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
+ in_descrs.size(), " arguments were provided.");
+ }
+
+ std::vector<const HashAggregateKernel*> kernels(in_descrs.size());
+
+ for (size_t i = 0; i < aggregates.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(auto function,
+ ctx->func_registry()->GetFunction(aggregates[i].function));
+ ARROW_ASSIGN_OR_RAISE(
+ const Kernel* kernel,
+ function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())}));
+ kernels[i] = static_cast<const HashAggregateKernel*>(kernel);
+ }
+ return kernels;
+}
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+ const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+ const std::vector<Aggregate>& aggregates, const std::vector<ValueDescr>& in_descrs) {
+ std::vector<std::unique_ptr<KernelState>> states(kernels.size());
+
+ for (size_t i = 0; i < aggregates.size(); ++i) {
+ auto options = aggregates[i].options;
+
+ if (options == nullptr) {
+ // use known default options for the named function if possible
+ auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function);
+ if (maybe_function.ok()) {
+ options = maybe_function.ValueOrDie()->default_options();
+ }
+ }
+
+ KernelContext kernel_ctx{ctx};
+ ARROW_ASSIGN_OR_RAISE(
+ states[i],
+ kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
+ {
+ in_descrs[i],
+ ValueDescr::Array(uint32()),
+ },
+ options}));
+ }
+
+ return std::move(states);
+}
+
+Result<FieldVector> ResolveKernels(
+ const std::vector<Aggregate>& aggregates,
+ const std::vector<const HashAggregateKernel*>& kernels,
+ const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+ const std::vector<ValueDescr>& descrs) {
+ FieldVector fields(descrs.size());
+
+ for (size_t i = 0; i < kernels.size(); ++i) {
+ KernelContext kernel_ctx{ctx};
+ kernel_ctx.SetState(states[i].get());
+
+ ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve(
+ &kernel_ctx, {
+ descrs[i],
+ ValueDescr::Array(uint32()),
+ }));
+ fields[i] = field(aggregates[i].function, std::move(descr.type));
+ }
+ return fields;
+}
+
+Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
+ const std::vector<Aggregate>& aggregates, bool use_threads,
+ ExecContext* ctx) {
+ auto task_group =
+ use_threads
+ ? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
+ : arrow::internal::TaskGroup::MakeSerial();
+
+ std::vector<const HashAggregateKernel*> kernels;
+ std::vector<std::vector<std::unique_ptr<KernelState>>> states;
+ FieldVector out_fields;
+
+ using arrow::compute::detail::ExecBatchIterator;
+ std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
+
+ if (!arguments.empty()) {
+ ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
+
+ // Construct and initialize HashAggregateKernels
+ auto argument_descrs = args_batch.GetDescriptors();
+
+ ARROW_ASSIGN_OR_RAISE(kernels, GetKernels(ctx, aggregates, argument_descrs));
+
+ states.resize(task_group->parallelism());
+ for (auto& state : states) {
+ ARROW_ASSIGN_OR_RAISE(state,
+ InitKernels(kernels, ctx, aggregates, argument_descrs));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs));
+
+ ARROW_ASSIGN_OR_RAISE(
+ argument_batch_iterator,
+ ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
+ }
+
+ // Construct Groupers
+ ARROW_ASSIGN_OR_RAISE(ExecBatch keys_batch, ExecBatch::Make(keys));
+ auto key_descrs = keys_batch.GetDescriptors();
+
+ std::vector<std::unique_ptr<Grouper>> groupers(task_group->parallelism());
+ for (auto& grouper : groupers) {
+ ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx));
+ }
+
+ std::mutex mutex;
+ std::unordered_map<std::thread::id, size_t> thread_ids;
+
+ int i = 0;
+ for (ValueDescr& key_descr : key_descrs) {
+ out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type)));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto key_batch_iterator,
+ ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
+
+ // start "streaming" execution
+ ExecBatch key_batch, argument_batch;
+ while ((argument_batch_iterator == NULLPTR ||
+ argument_batch_iterator->Next(&argument_batch)) &&
+ key_batch_iterator->Next(&key_batch)) {
+ if (key_batch.length == 0) continue;
+
+ task_group->Append([&, key_batch, argument_batch] {
+ size_t thread_index;
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first;
+ thread_index = it->second;
+ DCHECK_LT(static_cast<int>(thread_index), task_group->parallelism());
+ }
+
+ auto grouper = groupers[thread_index].get();
+
+ // compute a batch of group ids
+ ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
+
+ // consume group ids with HashAggregateKernels
+ for (size_t i = 0; i < kernels.size(); ++i) {
+ KernelContext batch_ctx{ctx};
+ batch_ctx.SetState(states[thread_index][i].get());
+ ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
+ RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
+ RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
+ }
+
+ return Status::OK();
+ });
+ }
+
+ RETURN_NOT_OK(task_group->Finish());
+
+ // Merge if necessary
+ for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
+ ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
+ ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
+ groupers[thread_index].reset();
+
+ for (size_t idx = 0; idx < kernels.size(); ++idx) {
+ KernelContext batch_ctx{ctx};
+ batch_ctx.SetState(states[0][idx].get());
+
+ RETURN_NOT_OK(kernels[idx]->resize(&batch_ctx, groupers[0]->num_groups()));
+ RETURN_NOT_OK(kernels[idx]->merge(&batch_ctx, std::move(*states[thread_index][idx]),
+ *transposition.array()));
+ states[thread_index][idx].reset();
+ }
+ }
+
+ // Finalize output
+ ArrayDataVector out_data(arguments.size() + keys.size());
+ auto it = out_data.begin();
+
+ for (size_t idx = 0; idx < kernels.size(); ++idx) {
+ KernelContext batch_ctx{ctx};
+ batch_ctx.SetState(states[0][idx].get());
+ Datum out;
+ RETURN_NOT_OK(kernels[idx]->finalize(&batch_ctx, &out));
+ *it++ = out.array();
+ }
+
+ ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques());
+ for (const auto& key : out_keys.values) {
+ *it++ = key.array();
+ }
+
+ int64_t length = out_data[0]->length;
+ return ArrayData::Make(struct_(std::move(out_fields)), length,
+ {/*null_bitmap=*/nullptr}, std::move(out_data),
+ /*null_count=*/0);
+}
+
+} // namespace internal
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate.h b/cpp/src/arrow/compute/exec/aggregate.h
new file mode 100644
index 0000000000..2c62acf231
--- /dev/null
+++ b/cpp/src/arrow/compute/exec/aggregate.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/api_aggregate.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/exec_plan.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+/// Internal use only: helper function for testing HashAggregateKernels.
+/// For public use see arrow::compute::Grouper or create an execution plan
+/// and use an aggregate node.
+ARROW_EXPORT
+Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
+ const std::vector<Aggregate>& aggregates, bool use_threads = false,
+ ExecContext* ctx = default_exec_context());
+
+Result<std::vector<const HashAggregateKernel*>> GetKernels(
+ ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
+ const std::vector<ValueDescr>& in_descrs);
+
+Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
+ const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
+ const std::vector<internal::Aggregate>& aggregates,
+ const std::vector<ValueDescr>& in_descrs);
+
+Result<FieldVector> ResolveKernels(
+ const std::vector<internal::Aggregate>& aggregates,
+ const std::vector<const HashAggregateKernel*>& kernels,
+ const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
+ const std::vector<ValueDescr>& descrs);
+
+} // namespace internal
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc
index f6813ecb68..712515bd58 100644
--- a/cpp/src/arrow/compute/exec/aggregate_node.cc
+++ b/cpp/src/arrow/compute/exec/aggregate_node.cc
@@ -21,11 +21,13 @@
#include <unordered_map>
#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/aggregate.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
#include "arrow/datum.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
@@ -39,25 +41,6 @@ using internal::checked_cast;
namespace compute {
-namespace internal {
-
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
- ExecContext* ctx, const std::vector<internal::Aggregate>& aggregates,
- const std::vector<ValueDescr>& in_descrs);
-
-Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
- const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
- const std::vector<internal::Aggregate>& aggregates,
- const std::vector<ValueDescr>& in_descrs);
-
-Result<FieldVector> ResolveKernels(
- const std::vector<internal::Aggregate>& aggregates,
- const std::vector<const HashAggregateKernel*>& kernels,
- const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
- const std::vector<ValueDescr>& descrs);
-
-} // namespace internal
-
namespace {
void AggregatesToString(
@@ -647,7 +630,7 @@ class GroupByNode : public ExecNode {
private:
struct ThreadLocalState {
- std::unique_ptr<internal::Grouper> grouper;
+ std::unique_ptr<Grouper> grouper;
std::vector<std::unique_ptr<KernelState>> agg_states;
};
@@ -670,7 +653,7 @@ class GroupByNode : public ExecNode {
}
// Construct grouper
- ARROW_ASSIGN_OR_RAISE(state->grouper, internal::Grouper::Make(key_descrs, ctx_));
+ ARROW_ASSIGN_OR_RAISE(state->grouper, Grouper::Make(key_descrs, ctx_));
// Build vector of aggregate source field data types
std::vector<ValueDescr> agg_src_descrs(agg_kernels_.size());
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 391c8b2cd4..9c124674d7 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -29,13 +29,12 @@
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/row/encode_internal.h"
#include "arrow/util/tracing_internal.h"
namespace arrow {
namespace compute {
-using internal::RowEncoder;
-
class HashJoinBasicImpl : public HashJoinImpl {
private:
struct ThreadLocalState;
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
deleted file mode 100644
index 58d8fb233f..0000000000
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ /dev/null
@@ -1,502 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/array/data.h"
-#include "arrow/compute/exec.h"
-#include "arrow/compute/exec/util.h"
-#include "arrow/compute/light_array.h"
-#include "arrow/memory_pool.h"
-#include "arrow/result.h"
-#include "arrow/status.h"
-#include "arrow/util/bit_util.h"
-
-namespace arrow {
-namespace compute {
-
-/// Converts between key representation as a collection of arrays for
-/// individual columns and another representation as a single array of rows
-/// combining data from all columns into one value.
-/// This conversion is reversible.
-/// Row-oriented storage is beneficial when there is a need for random access
-/// of individual rows and at the same time all included columns are likely to
-/// be accessed together, as in the case of hash table key.
-class KeyEncoder {
- public:
- struct KeyEncoderContext {
- bool has_avx2() const {
- return (hardware_flags & arrow::internal::CpuInfo::AVX2) > 0;
- }
- int64_t hardware_flags;
- util::TempVectorStack* stack;
- };
-
- /// Description of a storage format for rows produced by encoder.
- struct KeyRowMetadata {
- /// Is row a varying-length binary, using offsets array to find a beginning of a row,
- /// or is it a fixed-length binary.
- bool is_fixed_length;
-
- /// For a fixed-length binary row, common size of rows in bytes,
- /// rounded up to the multiple of alignment.
- ///
- /// For a varying-length binary, size of all encoded fixed-length key columns,
- /// including lengths of varying-length columns, rounded up to the multiple of string
- /// alignment.
- uint32_t fixed_length;
-
- /// Offset within a row to the array of 32-bit offsets within a row of
- /// ends of varbinary fields.
- /// Used only when the row is not fixed-length, zero for fixed-length row.
- /// There are N elements for N varbinary fields.
- /// Each element is the offset within a row of the first byte after
- /// the corresponding varbinary field bytes in that row.
- /// If varbinary fields begin at aligned addresses, than the end of the previous
- /// varbinary field needs to be rounded up according to the specified alignment
- /// to obtain the beginning of the next varbinary field.
- /// The first varbinary field starts at offset specified by fixed_length,
- /// which should already be aligned.
- uint32_t varbinary_end_array_offset;
-
- /// Fixed number of bytes per row that are used to encode null masks.
- /// Null masks indicate for a single row which of its key columns are null.
- /// Nth bit in the sequence of bytes assigned to a row represents null
- /// information for Nth field according to the order in which they are encoded.
- int null_masks_bytes_per_row;
-
- /// Power of 2. Every row will start at the offset aligned to that number of bytes.
- int row_alignment;
-
- /// Power of 2. Must be no greater than row alignment.
- /// Every non-power-of-2 binary field and every varbinary field bytes
- /// will start aligned to that number of bytes.
- int string_alignment;
-
- /// Metadata of encoded columns in their original order.
- std::vector<KeyColumnMetadata> column_metadatas;
-
- /// Order in which fields are encoded.
- std::vector<uint32_t> column_order;
-
- /// Offsets within a row to fields in their encoding order.
- std::vector<uint32_t> column_offsets;
-
- /// Rounding up offset to the nearest multiple of alignment value.
- /// Alignment must be a power of 2.
- static inline uint32_t padding_for_alignment(uint32_t offset,
- int required_alignment) {
- ARROW_DCHECK(ARROW_POPCOUNT64(required_alignment) == 1);
- return static_cast<uint32_t>((-static_cast<int32_t>(offset)) &
- (required_alignment - 1));
- }
-
- /// Rounding up offset to the beginning of next column,
- /// chosing required alignment based on the data type of that column.
- static inline uint32_t padding_for_alignment(uint32_t offset, int string_alignment,
- const KeyColumnMetadata& col_metadata) {
- if (!col_metadata.is_fixed_length ||
- ARROW_POPCOUNT64(col_metadata.fixed_length) <= 1) {
- return 0;
- } else {
- return padding_for_alignment(offset, string_alignment);
- }
- }
-
- /// Returns an array of offsets within a row of ends of varbinary fields.
- inline const uint32_t* varbinary_end_array(const uint8_t* row) const {
- ARROW_DCHECK(!is_fixed_length);
- return reinterpret_cast<const uint32_t*>(row + varbinary_end_array_offset);
- }
- inline uint32_t* varbinary_end_array(uint8_t* row) const {
- ARROW_DCHECK(!is_fixed_length);
- return reinterpret_cast<uint32_t*>(row + varbinary_end_array_offset);
- }
-
- /// Returns the offset within the row and length of the first varbinary field.
- inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
- uint32_t* length) const {
- ARROW_DCHECK(!is_fixed_length);
- *offset = fixed_length;
- *length = varbinary_end_array(row)[0] - fixed_length;
- }
-
- /// Returns the offset within the row and length of the second and further varbinary
- /// fields.
- inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
- uint32_t* out_offset,
- uint32_t* out_length) const {
- ARROW_DCHECK(!is_fixed_length);
- ARROW_DCHECK(varbinary_id > 0);
- const uint32_t* varbinary_end = varbinary_end_array(row);
- uint32_t offset = varbinary_end[varbinary_id - 1];
- offset += padding_for_alignment(offset, string_alignment);
- *out_offset = offset;
- *out_length = varbinary_end[varbinary_id] - offset;
- }
-
- uint32_t encoded_field_order(uint32_t icol) const { return column_order[icol]; }
-
- uint32_t encoded_field_offset(uint32_t icol) const { return column_offsets[icol]; }
-
- uint32_t num_cols() const { return static_cast<uint32_t>(column_metadatas.size()); }
-
- uint32_t num_varbinary_cols() const;
-
- void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
- int in_row_alignment, int in_string_alignment);
-
- bool is_compatible(const KeyRowMetadata& other) const;
- };
-
- class KeyRowArray {
- public:
- KeyRowArray();
- Status Init(MemoryPool* pool, const KeyRowMetadata& metadata);
- void Clean();
- Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
- Status AppendSelectionFrom(const KeyRowArray& from, uint32_t num_rows_to_append,
- const uint16_t* source_row_ids);
- const KeyRowMetadata& metadata() const { return metadata_; }
- int64_t length() const { return num_rows_; }
- const uint8_t* data(int i) const {
- ARROW_DCHECK(i >= 0 && i <= max_buffers_);
- return buffers_[i];
- }
- uint8_t* mutable_data(int i) {
- ARROW_DCHECK(i >= 0 && i <= max_buffers_);
- return mutable_buffers_[i];
- }
- const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
- uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
- const uint8_t* null_masks() const { return null_masks_->data(); }
- uint8_t* null_masks() { return null_masks_->mutable_data(); }
-
- bool has_any_nulls(const KeyEncoderContext* ctx) const;
-
- private:
- Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
- Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
-
- int64_t size_null_masks(int64_t num_rows);
- int64_t size_offsets(int64_t num_rows);
- int64_t size_rows_fixed_length(int64_t num_rows);
- int64_t size_rows_varying_length(int64_t num_bytes);
- void update_buffer_pointers();
-
- static constexpr int64_t padding_for_vectors = 64;
- MemoryPool* pool_;
- KeyRowMetadata metadata_;
- /// Buffers can only expand during lifetime and never shrink.
- std::unique_ptr<ResizableBuffer> null_masks_;
- std::unique_ptr<ResizableBuffer> offsets_;
- std::unique_ptr<ResizableBuffer> rows_;
- static constexpr int max_buffers_ = 3;
- const uint8_t* buffers_[max_buffers_];
- uint8_t* mutable_buffers_[max_buffers_];
- int64_t num_rows_;
- int64_t rows_capacity_;
- int64_t bytes_capacity_;
-
- // Mutable to allow lazy evaluation
- mutable int64_t num_rows_for_has_any_nulls_;
- mutable bool has_any_nulls_;
- };
-
- void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
- int row_alignment, int string_alignment);
-
- const KeyRowMetadata& row_metadata() { return row_metadata_; }
-
- void PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
- const std::vector<KeyColumnArray>& cols);
- Status EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
- const uint16_t* selection);
-
- /// Decode a window of row oriented data into a corresponding
- /// window of column oriented storage.
- /// The output buffers need to be correctly allocated and sized before
- /// calling each method.
- /// For that reason decoding is split into two functions.
- /// The output of the first one, that processes everything except for
- /// varying length buffers, can be used to find out required varying
- /// length buffers sizes.
- void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t start_row_output,
- int64_t num_rows, const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols);
-
- void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t start_row_output,
- int64_t num_rows, const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols);
-
- const std::vector<KeyColumnArray>& GetBatchColumns() const { return batch_all_cols_; }
-
- private:
- /// Prepare column array vectors.
- /// Output column arrays represent a range of input column arrays
- /// specified by starting row and number of rows.
- /// Three vectors are generated:
- /// - all columns
- /// - fixed-length columns only
- /// - varying-length columns only
- void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
- const std::vector<KeyColumnArray>& cols_in);
-
- class TransformBoolean {
- public:
- static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
- const KeyColumnArray& temp);
- static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
- KeyEncoderContext* ctx);
- };
-
- class EncoderInteger {
- public:
- static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx, KeyColumnArray* temp);
- static bool UsesTransform(const KeyColumnArray& column);
- static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
- const KeyColumnArray& temp);
- static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
- KeyEncoderContext* ctx);
-
- private:
- static bool IsBoolean(const KeyColumnMetadata& metadata);
- };
-
- class EncoderBinary {
- public:
- static void EncodeSelected(uint32_t offset_within_row, KeyRowArray* rows,
- const KeyColumnArray& col, uint32_t num_selected,
- const uint16_t* selection);
- static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx, KeyColumnArray* temp);
- static bool IsInteger(const KeyColumnMetadata& metadata);
-
- private:
- template <class COPY_FN, class SET_NULL_FN>
- static void EncodeSelectedImp(uint32_t offset_within_row, KeyRowArray* rows,
- const KeyColumnArray& col, uint32_t num_selected,
- const uint16_t* selection, COPY_FN copy_fn,
- SET_NULL_FN set_null_fn);
-
- template <bool is_row_fixed_length, class COPY_FN>
- static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray* rows_const,
- KeyRowArray* rows_mutable_maybe_null,
- const KeyColumnArray* col_const,
- KeyColumnArray* col_mutable_maybe_null,
- COPY_FN copy_fn);
- template <bool is_row_fixed_length>
- static void DecodeImp(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row, const KeyRowArray& rows,
- KeyColumnArray* col);
-#if defined(ARROW_HAVE_AVX2)
- static void DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
- uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col);
- template <bool is_row_fixed_length>
- static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row, const KeyRowArray& rows,
- KeyColumnArray* col);
-#endif
- };
-
- class EncoderBinaryPair {
- public:
- static bool CanProcessPair(const KeyColumnMetadata& col1,
- const KeyColumnMetadata& col2) {
- return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
- }
- static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col1,
- KeyColumnArray* col2, KeyEncoderContext* ctx,
- KeyColumnArray* temp1, KeyColumnArray* temp2);
-
- private:
- template <bool is_row_fixed_length, typename col1_type, typename col2_type>
- static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
- uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col1,
- KeyColumnArray* col2);
-#if defined(ARROW_HAVE_AVX2)
- static uint32_t DecodeHelper_avx2(bool is_row_fixed_length, uint32_t col_width,
- uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row, const KeyRowArray& rows,
- KeyColumnArray* col1, KeyColumnArray* col2);
- template <bool is_row_fixed_length, uint32_t col_width>
- static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row, const KeyRowArray& rows,
- KeyColumnArray* col1, KeyColumnArray* col2);
-#endif
- };
-
- class EncoderOffsets {
- public:
- static void GetRowOffsetsSelected(KeyRowArray* rows,
- const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected, const uint16_t* selection);
- static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected, const uint16_t* selection);
-
- static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
- std::vector<KeyColumnArray>* varbinary_cols,
- const std::vector<uint32_t>& varbinary_cols_base_offset,
- KeyEncoderContext* ctx);
-
- private:
- template <bool has_nulls, bool is_first_varbinary>
- static void EncodeSelectedImp(uint32_t ivarbinary, KeyRowArray* rows,
- const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected, const uint16_t* selection);
- };
-
- class EncoderVarBinary {
- public:
- static void EncodeSelected(uint32_t ivarbinary, KeyRowArray* rows,
- const KeyColumnArray& cols, uint32_t num_selected,
- const uint16_t* selection);
-
- static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx);
-
- private:
- template <bool first_varbinary_col, class COPY_FN>
- static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id,
- const KeyRowArray* rows_const,
- KeyRowArray* rows_mutable_maybe_null,
- const KeyColumnArray* col_const,
- KeyColumnArray* col_mutable_maybe_null,
- COPY_FN copy_fn);
- template <bool first_varbinary_col>
- static void DecodeImp(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id, const KeyRowArray& rows,
- KeyColumnArray* col);
-#if defined(ARROW_HAVE_AVX2)
- static void DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id, const KeyRowArray& rows,
- KeyColumnArray* col);
- template <bool first_varbinary_col>
- static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id, const KeyRowArray& rows,
- KeyColumnArray* col);
-#endif
- };
-
- class EncoderNulls {
- public:
- static void EncodeSelected(KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected, const uint16_t* selection);
-
- static void Decode(uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols);
- };
-
- KeyEncoderContext* ctx_;
-
- // Data initialized once, based on data types of key columns
- KeyRowMetadata row_metadata_;
-
- // Data initialized for each input batch.
- // All elements are ordered according to the order of encoded fields in a row.
- std::vector<KeyColumnArray> batch_all_cols_;
- std::vector<KeyColumnArray> batch_varbinary_cols_;
- std::vector<uint32_t> batch_varbinary_cols_base_offsets_;
-};
-
-template <bool is_row_fixed_length, class COPY_FN>
-inline void KeyEncoder::EncoderBinary::DecodeHelper(
- uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
- const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
- COPY_FN copy_fn) {
- ARROW_DCHECK(col_const && col_const->metadata().is_fixed_length);
- uint32_t col_width = col_const->metadata().fixed_length;
-
- if (is_row_fixed_length) {
- uint32_t row_width = rows_const->metadata().fixed_length;
- for (uint32_t i = 0; i < num_rows; ++i) {
- const uint8_t* src;
- uint8_t* dst;
- src = rows_const->data(1) + row_width * (start_row + i) + offset_within_row;
- dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
- copy_fn(dst, src, col_width);
- }
- } else {
- const uint32_t* row_offsets = rows_const->offsets();
- for (uint32_t i = 0; i < num_rows; ++i) {
- const uint8_t* src;
- uint8_t* dst;
- src = rows_const->data(2) + row_offsets[start_row + i] + offset_within_row;
- dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
- copy_fn(dst, src, col_width);
- }
- }
-}
-
-template <bool first_varbinary_col, class COPY_FN>
-inline void KeyEncoder::EncoderVarBinary::DecodeHelper(
- uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
- const KeyRowArray* rows_const, KeyRowArray* rows_mutable_maybe_null,
- const KeyColumnArray* col_const, KeyColumnArray* col_mutable_maybe_null,
- COPY_FN copy_fn) {
- // Column and rows need to be varying length
- ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
- !col_const->metadata().is_fixed_length);
-
- const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
- const uint32_t* col_offsets = col_const->offsets();
-
- uint32_t col_offset_next = col_offsets[0];
- for (uint32_t i = 0; i < num_rows; ++i) {
- uint32_t col_offset = col_offset_next;
- col_offset_next = col_offsets[i + 1];
-
- uint32_t row_offset = row_offsets_for_batch[i];
- const uint8_t* row = rows_const->data(2) + row_offset;
-
- uint32_t offset_within_row;
- uint32_t length;
- if (first_varbinary_col) {
- rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
- &length);
- } else {
- rows_const->metadata().nth_varbinary_offset_and_length(row, varbinary_col_id,
- &offset_within_row, &length);
- }
-
- row_offset += offset_within_row;
-
- const uint8_t* src;
- uint8_t* dst;
- src = rows_const->data(2) + row_offset;
- dst = col_mutable_maybe_null->mutable_data(2) + col_offset;
- copy_fn(dst, src, length);
- }
-}
-
-} // namespace compute
-} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index e81ed64b6f..5a5d524c40 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -22,7 +22,7 @@
#include <algorithm>
#include <cstdint>
-#include "arrow/compute/exec/key_encode.h"
+#include "arrow/compute/light_array.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/ubsan.h"
@@ -377,7 +377,7 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t
}
void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
- KeyEncoder::KeyEncoderContext* ctx, uint32_t* hashes) {
+ LightContext* ctx, uint32_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
@@ -463,7 +463,7 @@ Status Hashing32::HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
std::vector<KeyColumnArray> column_arrays;
RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
- KeyEncoder::KeyEncoderContext ctx;
+ LightContext ctx;
ctx.hardware_flags = hardware_flags;
ctx.stack = temp_stack;
HashMultiColumn(column_arrays, &ctx, hashes);
@@ -814,7 +814,7 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_rows, uint64_t lengt
}
void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
- KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes) {
+ LightContext* ctx, uint64_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
constexpr uint32_t max_batch_size = util::MiniBatch::kMiniBatchLength;
@@ -895,7 +895,7 @@ Status Hashing64::HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
std::vector<KeyColumnArray> column_arrays;
RETURN_NOT_OK(ColumnArraysFromExecBatch(key_batch, offset, length, &column_arrays));
- KeyEncoder::KeyEncoderContext ctx;
+ LightContext ctx;
ctx.hardware_flags = hardware_flags;
ctx.stack = temp_stack;
HashMultiColumn(column_arrays, &ctx, hashes);
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 05f39bb729..f8af798838 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -23,8 +23,8 @@
#include <cstdint>
-#include "arrow/compute/exec/key_encode.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
namespace arrow {
namespace compute {
@@ -45,8 +45,8 @@ class ARROW_EXPORT Hashing32 {
friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
public:
- static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
- KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
+ static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
+ uint32_t* out_hash);
static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
@@ -157,8 +157,8 @@ class ARROW_EXPORT Hashing64 {
friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
public:
- static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
- KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
+ static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
+ uint64_t* hashes);
static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes,
int64_t hardware_flags, util::TempVectorStack* temp_stack,
diff --git a/cpp/src/arrow/compute/exec/schema_util.h b/cpp/src/arrow/compute/exec/schema_util.h
index 4e307e2380..91b7e6cfc6 100644
--- a/cpp/src/arrow/compute/exec/schema_util.h
+++ b/cpp/src/arrow/compute/exec/schema_util.h
@@ -22,8 +22,8 @@
#include <string>
#include <vector>
-#include "arrow/compute/exec/key_encode.h" // for KeyColumnMetadata
-#include "arrow/type.h" // for DataType, FieldRef, Field and Schema
+#include "arrow/compute/light_array.h" // for KeyColumnMetadata
+#include "arrow/type.h" // for DataType, FieldRef, Field and Schema
#include "arrow/util/mutex.h"
namespace arrow {
diff --git a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
index 230d9649de..c271285434 100644
--- a/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
+++ b/cpp/src/arrow/compute/kernels/aggregate_benchmark.cc
@@ -21,6 +21,7 @@
#include "arrow/array/array_primitive.h"
#include "arrow/compute/api.h"
+#include "arrow/compute/exec/aggregate.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/benchmark_util.h"
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index fc19ad4b7e..de63293595 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -29,8 +29,6 @@
#include "arrow/buffer_builder.h"
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_vector.h"
-#include "arrow/compute/exec/key_compare.h"
-#include "arrow/compute/exec/key_encode.h"
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/key_map.h"
#include "arrow/compute/exec/util.h"
@@ -41,6 +39,7 @@
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/row_encoder.h"
#include "arrow/compute/kernels/util_internal.h"
+#include "arrow/compute/row/grouper.h"
#include "arrow/record_batch.h"
#include "arrow/stl_allocator.h"
#include "arrow/util/bit_run_reader.h"
@@ -65,485 +64,6 @@ namespace compute {
namespace internal {
namespace {
-struct GrouperImpl : Grouper {
- static Result<std::unique_ptr<GrouperImpl>> Make(const std::vector<ValueDescr>& keys,
- ExecContext* ctx) {
- auto impl = ::arrow::internal::make_unique<GrouperImpl>();
-
- impl->encoders_.resize(keys.size());
- impl->ctx_ = ctx;
-
- for (size_t i = 0; i < keys.size(); ++i) {
- const auto& key = keys[i].type;
-
- if (key->id() == Type::BOOL) {
- impl->encoders_[i] = ::arrow::internal::make_unique<BooleanKeyEncoder>();
- continue;
- }
-
- if (key->id() == Type::DICTIONARY) {
- impl->encoders_[i] =
- ::arrow::internal::make_unique<DictionaryKeyEncoder>(key, ctx->memory_pool());
- continue;
- }
-
- if (is_fixed_width(key->id())) {
- impl->encoders_[i] = ::arrow::internal::make_unique<FixedWidthKeyEncoder>(key);
- continue;
- }
-
- if (is_binary_like(key->id())) {
- impl->encoders_[i] =
- ::arrow::internal::make_unique<VarLengthKeyEncoder<BinaryType>>(key);
- continue;
- }
-
- if (is_large_binary_like(key->id())) {
- impl->encoders_[i] =
- ::arrow::internal::make_unique<VarLengthKeyEncoder<LargeBinaryType>>(key);
- continue;
- }
-
- if (key->id() == Type::NA) {
- impl->encoders_[i] = ::arrow::internal::make_unique<NullKeyEncoder>();
- continue;
- }
-
- return Status::NotImplemented("Keys of type ", *key);
- }
-
- return std::move(impl);
- }
-
- Result<Datum> Consume(const ExecBatch& batch) override {
- std::vector<int32_t> offsets_batch(batch.length + 1);
- for (int i = 0; i < batch.num_values(); ++i) {
- encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data());
- }
-
- int32_t total_length = 0;
- for (int64_t i = 0; i < batch.length; ++i) {
- auto total_length_before = total_length;
- total_length += offsets_batch[i];
- offsets_batch[i] = total_length_before;
- }
- offsets_batch[batch.length] = total_length;
-
- std::vector<uint8_t> key_bytes_batch(total_length);
- std::vector<uint8_t*> key_buf_ptrs(batch.length);
- for (int64_t i = 0; i < batch.length; ++i) {
- key_buf_ptrs[i] = key_bytes_batch.data() + offsets_batch[i];
- }
-
- for (int i = 0; i < batch.num_values(); ++i) {
- RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data()));
- }
-
- TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool());
- RETURN_NOT_OK(group_ids_batch.Resize(batch.length));
-
- for (int64_t i = 0; i < batch.length; ++i) {
- int32_t key_length = offsets_batch[i + 1] - offsets_batch[i];
- std::string key(
- reinterpret_cast<const char*>(key_bytes_batch.data() + offsets_batch[i]),
- key_length);
-
- auto it_success = map_.emplace(key, num_groups_);
- auto group_id = it_success.first->second;
-
- if (it_success.second) {
- // new key; update offsets and key_bytes
- ++num_groups_;
- // Skip if there are no keys
- if (key_length > 0) {
- auto next_key_offset = static_cast<int32_t>(key_bytes_.size());
- key_bytes_.resize(next_key_offset + key_length);
- offsets_.push_back(next_key_offset + key_length);
- memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length);
- }
- }
-
- group_ids_batch.UnsafeAppend(group_id);
- }
-
- ARROW_ASSIGN_OR_RAISE(auto group_ids, group_ids_batch.Finish());
- return Datum(UInt32Array(batch.length, std::move(group_ids)));
- }
-
- uint32_t num_groups() const override { return num_groups_; }
-
- Result<ExecBatch> GetUniques() override {
- ExecBatch out({}, num_groups_);
-
- std::vector<uint8_t*> key_buf_ptrs(num_groups_);
- for (int64_t i = 0; i < num_groups_; ++i) {
- key_buf_ptrs[i] = key_bytes_.data() + offsets_[i];
- }
-
- out.values.resize(encoders_.size());
- for (size_t i = 0; i < encoders_.size(); ++i) {
- ARROW_ASSIGN_OR_RAISE(
- out.values[i],
- encoders_[i]->Decode(key_buf_ptrs.data(), static_cast<int32_t>(num_groups_),
- ctx_->memory_pool()));
- }
-
- return out;
- }
-
- ExecContext* ctx_;
- std::unordered_map<std::string, uint32_t> map_;
- std::vector<int32_t> offsets_ = {0};
- std::vector<uint8_t> key_bytes_;
- uint32_t num_groups_ = 0;
- std::vector<std::unique_ptr<KeyEncoder>> encoders_;
-};
-
-struct GrouperFastImpl : Grouper {
- static constexpr int kBitmapPaddingForSIMD = 64; // bits
- static constexpr int kPaddingForSIMD = 32; // bytes
-
- static bool CanUse(const std::vector<ValueDescr>& keys) {
-#if ARROW_LITTLE_ENDIAN
- for (size_t i = 0; i < keys.size(); ++i) {
- const auto& key = keys[i].type;
- if (is_large_binary_like(key->id())) {
- return false;
- }
- }
- return true;
-#else
- return false;
-#endif
- }
-
- static Result<std::unique_ptr<GrouperFastImpl>> Make(
- const std::vector<ValueDescr>& keys, ExecContext* ctx) {
- auto impl = ::arrow::internal::make_unique<GrouperFastImpl>();
- impl->ctx_ = ctx;
-
- RETURN_NOT_OK(impl->temp_stack_.Init(ctx->memory_pool(), 64 * minibatch_size_max_));
- impl->encode_ctx_.hardware_flags =
- arrow::internal::CpuInfo::GetInstance()->hardware_flags();
- impl->encode_ctx_.stack = &impl->temp_stack_;
-
- auto num_columns = keys.size();
- impl->col_metadata_.resize(num_columns);
- impl->key_types_.resize(num_columns);
- impl->dictionaries_.resize(num_columns);
- for (size_t icol = 0; icol < num_columns; ++icol) {
- const auto& key = keys[icol].type;
- if (key->id() == Type::DICTIONARY) {
- auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
- ARROW_DCHECK(bit_width % 8 == 0);
- impl->col_metadata_[icol] =
- arrow::compute::KeyColumnMetadata(true, bit_width / 8);
- } else if (key->id() == Type::BOOL) {
- impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(true, 0);
- } else if (is_fixed_width(key->id())) {
- impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(
- true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
- } else if (is_binary_like(key->id())) {
- impl->col_metadata_[icol] =
- arrow::compute::KeyColumnMetadata(false, sizeof(uint32_t));
- } else if (key->id() == Type::NA) {
- impl->col_metadata_[icol] =
- arrow::compute::KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
- } else {
- return Status::NotImplemented("Keys of type ", *key);
- }
- impl->key_types_[icol] = key;
- }
-
- impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_,
- /* row_alignment = */ sizeof(uint64_t),
- /* string_alignment = */ sizeof(uint64_t));
- RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
- RETURN_NOT_OK(
- impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
- impl->minibatch_size_ = impl->minibatch_size_min_;
- GrouperFastImpl* impl_ptr = impl.get();
- auto equal_func = [impl_ptr](
- int num_keys_to_compare, const uint16_t* selection_may_be_null,
- const uint32_t* group_ids, uint32_t* out_num_keys_mismatch,
- uint16_t* out_selection_mismatch) {
- arrow::compute::KeyCompare::CompareColumnsToRows(
- num_keys_to_compare, selection_may_be_null, group_ids, &impl_ptr->encode_ctx_,
- out_num_keys_mismatch, out_selection_mismatch,
- impl_ptr->encoder_.GetBatchColumns(), impl_ptr->rows_);
- };
- auto append_func = [impl_ptr](int num_keys, const uint16_t* selection) {
- RETURN_NOT_OK(impl_ptr->encoder_.EncodeSelected(&impl_ptr->rows_minibatch_,
- num_keys, selection));
- return impl_ptr->rows_.AppendSelectionFrom(impl_ptr->rows_minibatch_, num_keys,
- nullptr);
- };
- RETURN_NOT_OK(impl->map_.init(impl->encode_ctx_.hardware_flags, ctx->memory_pool(),
- impl->encode_ctx_.stack, impl->log_minibatch_max_,
- equal_func, append_func));
- impl->cols_.resize(num_columns);
- impl->minibatch_hashes_.resize(impl->minibatch_size_max_ +
- kPaddingForSIMD / sizeof(uint32_t));
-
- return std::move(impl);
- }
-
- ~GrouperFastImpl() { map_.cleanup(); }
-
- Result<Datum> Consume(const ExecBatch& batch) override {
- // ARROW-14027: broadcast scalar arguments for now
- for (int i = 0; i < batch.num_values(); i++) {
- if (batch.values[i].is_scalar()) {
- ExecBatch expanded = batch;
- for (int j = i; j < expanded.num_values(); j++) {
- if (expanded.values[j].is_scalar()) {
- ARROW_ASSIGN_OR_RAISE(
- expanded.values[j],
- MakeArrayFromScalar(*expanded.values[j].scalar(), expanded.length,
- ctx_->memory_pool()));
- }
- }
- return ConsumeImpl(expanded);
- }
- }
- return ConsumeImpl(batch);
- }
-
- Result<Datum> ConsumeImpl(const ExecBatch& batch) {
- int64_t num_rows = batch.length;
- int num_columns = batch.num_values();
- // Process dictionaries
- for (int icol = 0; icol < num_columns; ++icol) {
- if (key_types_[icol]->id() == Type::DICTIONARY) {
- auto data = batch[icol].array();
- auto dict = MakeArray(data->dictionary);
- if (dictionaries_[icol]) {
- if (!dictionaries_[icol]->Equals(dict)) {
- // TODO(bkietz) unify if necessary. For now, just error if any batch's
- // dictionary differs from the first we saw for this key
- return Status::NotImplemented("Unifying differing dictionaries");
- }
- } else {
- dictionaries_[icol] = std::move(dict);
- }
- }
- }
-
- std::shared_ptr<arrow::Buffer> group_ids;
- ARROW_ASSIGN_OR_RAISE(
- group_ids, AllocateBuffer(sizeof(uint32_t) * num_rows, ctx_->memory_pool()));
-
- for (int icol = 0; icol < num_columns; ++icol) {
- const uint8_t* non_nulls = NULLPTR;
- const uint8_t* fixedlen = NULLPTR;
- const uint8_t* varlen = NULLPTR;
-
- // Skip if the key's type is NULL
- if (key_types_[icol]->id() != Type::NA) {
- if (batch[icol].array()->buffers[0] != NULLPTR) {
- non_nulls = batch[icol].array()->buffers[0]->data();
- }
- fixedlen = batch[icol].array()->buffers[1]->data();
- if (!col_metadata_[icol].is_fixed_length) {
- varlen = batch[icol].array()->buffers[2]->data();
- }
- }
-
- int64_t offset = batch[icol].array()->offset;
-
- auto col_base = arrow::compute::KeyColumnArray(
- col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen);
-
- cols_[icol] = col_base.Slice(offset, num_rows);
- }
-
- // Split into smaller mini-batches
- //
- for (uint32_t start_row = 0; start_row < num_rows;) {
- uint32_t batch_size_next = std::min(static_cast<uint32_t>(minibatch_size_),
- static_cast<uint32_t>(num_rows) - start_row);
-
- // Encode
- rows_minibatch_.Clean();
- encoder_.PrepareEncodeSelected(start_row, batch_size_next, cols_);
-
- // Compute hash
- Hashing32::HashMultiColumn(encoder_.GetBatchColumns(), &encode_ctx_,
- minibatch_hashes_.data());
-
- // Map
- auto match_bitvector =
- util::TempVectorHolder<uint8_t>(&temp_stack_, (batch_size_next + 7) / 8);
- {
- auto local_slots = util::TempVectorHolder<uint8_t>(&temp_stack_, batch_size_next);
- map_.early_filter(batch_size_next, minibatch_hashes_.data(),
- match_bitvector.mutable_data(), local_slots.mutable_data());
- map_.find(batch_size_next, minibatch_hashes_.data(),
- match_bitvector.mutable_data(), local_slots.mutable_data(),
- reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row);
- }
- auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next);
- int num_ids;
- util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next,
- match_bitvector.mutable_data(), &num_ids,
- ids.mutable_data());
-
- RETURN_NOT_OK(map_.map_new_keys(
- num_ids, ids.mutable_data(), minibatch_hashes_.data(),
- reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row));
-
- start_row += batch_size_next;
-
- if (minibatch_size_ * 2 <= minibatch_size_max_) {
- minibatch_size_ *= 2;
- }
- }
-
- return Datum(UInt32Array(batch.length, std::move(group_ids)));
- }
-
- uint32_t num_groups() const override { return static_cast<uint32_t>(rows_.length()); }
-
- // Make sure padded buffers end up with the right logical size
-
- Result<std::shared_ptr<Buffer>> AllocatePaddedBitmap(int64_t length) {
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Buffer> buf,
- AllocateBitmap(length + kBitmapPaddingForSIMD, ctx_->memory_pool()));
- return SliceMutableBuffer(buf, 0, bit_util::BytesForBits(length));
- }
-
- Result<std::shared_ptr<Buffer>> AllocatePaddedBuffer(int64_t size) {
- ARROW_ASSIGN_OR_RAISE(
- std::shared_ptr<Buffer> buf,
- AllocateBuffer(size + kBitmapPaddingForSIMD, ctx_->memory_pool()));
- return SliceMutableBuffer(buf, 0, size);
- }
-
- Result<ExecBatch> GetUniques() override {
- auto num_columns = static_cast<uint32_t>(col_metadata_.size());
- int64_t num_groups = rows_.length();
-
- std::vector<std::shared_ptr<Buffer>> non_null_bufs(num_columns);
- std::vector<std::shared_ptr<Buffer>> fixedlen_bufs(num_columns);
- std::vector<std::shared_ptr<Buffer>> varlen_bufs(num_columns);
-
- for (size_t i = 0; i < num_columns; ++i) {
- if (col_metadata_[i].is_null_type) {
- uint8_t* non_nulls = NULLPTR;
- uint8_t* fixedlen = NULLPTR;
- cols_[i] = arrow::compute::KeyColumnArray(col_metadata_[i], num_groups, non_nulls,
- fixedlen, NULLPTR);
- continue;
- }
- ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
- if (col_metadata_[i].is_fixed_length && !col_metadata_[i].is_null_type) {
- if (col_metadata_[i].fixed_length == 0) {
- ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBitmap(num_groups));
- } else {
- ARROW_ASSIGN_OR_RAISE(
- fixedlen_bufs[i],
- AllocatePaddedBuffer(num_groups * col_metadata_[i].fixed_length));
- }
- } else {
- ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
- AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
- }
- cols_[i] = arrow::compute::KeyColumnArray(
- col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
- fixedlen_bufs[i]->mutable_data(), nullptr);
- }
-
- for (int64_t start_row = 0; start_row < num_groups;) {
- int64_t batch_size_next =
- std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
- encoder_.DecodeFixedLengthBuffers(start_row, start_row, batch_size_next, rows_,
- &cols_);
- start_row += batch_size_next;
- }
-
- if (!rows_.metadata().is_fixed_length) {
- for (size_t i = 0; i < num_columns; ++i) {
- if (!col_metadata_[i].is_fixed_length) {
- auto varlen_size =
- reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
- ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
- cols_[i] = arrow::compute::KeyColumnArray(
- col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
- fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
- }
- }
-
- for (int64_t start_row = 0; start_row < num_groups;) {
- int64_t batch_size_next =
- std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
- encoder_.DecodeVaryingLengthBuffers(start_row, start_row, batch_size_next, rows_,
- &cols_);
- start_row += batch_size_next;
- }
- }
-
- ExecBatch out({}, num_groups);
- out.values.resize(num_columns);
- for (size_t i = 0; i < num_columns; ++i) {
- if (col_metadata_[i].is_null_type) {
- out.values[i] = ArrayData::Make(null(), num_groups, {nullptr}, num_groups);
- continue;
- }
- auto valid_count = arrow::internal::CountSetBits(
- non_null_bufs[i]->data(), /*offset=*/0, static_cast<int64_t>(num_groups));
- int null_count = static_cast<int>(num_groups) - static_cast<int>(valid_count);
-
- if (col_metadata_[i].is_fixed_length) {
- out.values[i] = ArrayData::Make(
- key_types_[i], num_groups,
- {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i])}, null_count);
- } else {
- out.values[i] =
- ArrayData::Make(key_types_[i], num_groups,
- {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i]),
- std::move(varlen_bufs[i])},
- null_count);
- }
- }
-
- // Process dictionaries
- for (size_t icol = 0; icol < num_columns; ++icol) {
- if (key_types_[icol]->id() == Type::DICTIONARY) {
- if (dictionaries_[icol]) {
- out.values[icol].array()->dictionary = dictionaries_[icol]->data();
- } else {
- ARROW_ASSIGN_OR_RAISE(auto dict, MakeArrayOfNull(key_types_[icol], 0));
- out.values[icol].array()->dictionary = dict->data();
- }
- }
- }
-
- return out;
- }
-
- static constexpr int log_minibatch_max_ = 10;
- static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
- static constexpr int minibatch_size_min_ = 128;
- int minibatch_size_;
-
- ExecContext* ctx_;
- arrow::util::TempVectorStack temp_stack_;
- arrow::compute::KeyEncoder::KeyEncoderContext encode_ctx_;
-
- std::vector<std::shared_ptr<arrow::DataType>> key_types_;
- std::vector<arrow::compute::KeyColumnMetadata> col_metadata_;
- std::vector<arrow::compute::KeyColumnArray> cols_;
- std::vector<uint32_t> minibatch_hashes_;
-
- std::vector<std::shared_ptr<Array>> dictionaries_;
-
- arrow::compute::KeyEncoder::KeyRowArray rows_;
- arrow::compute::KeyEncoder::KeyRowArray rows_minibatch_;
- arrow::compute::KeyEncoder encoder_;
- arrow::compute::SwissTable map_;
-};
-
/// C++ abstract base class for the HashAggregateKernel interface.
/// Implementations should be default constructible and perform initialization in
/// Init().
@@ -3172,272 +2692,6 @@ struct GroupedListFactory {
};
} // namespace
-Result<std::vector<const HashAggregateKernel*>> GetKernels(
- ExecContext* ctx, const std::vector<Aggregate>& aggregates,
- const std::vector<ValueDescr>& in_descrs) {
- if (aggregates.size() != in_descrs.size()) {
- return Status::Invalid(aggregates.size(), " aggregate functions were specified but ",
- in_descrs.size(), " arguments were provided.");
- }
-
- std::vector<const HashAggregateKernel*> kernels(in_descrs.size());
-
- for (size_t i = 0; i < aggregates.size(); ++i) {
- ARROW_ASSIGN_OR_RAISE(auto function,
- ctx->func_registry()->GetFunction(aggregates[i].function));
- ARROW_ASSIGN_OR_RAISE(
- const Kernel* kernel,
- function->DispatchExact({in_descrs[i], ValueDescr::Array(uint32())}));
- kernels[i] = static_cast<const HashAggregateKernel*>(kernel);
- }
- return kernels;
-}
-
-Result<std::vector<std::unique_ptr<KernelState>>> InitKernels(
- const std::vector<const HashAggregateKernel*>& kernels, ExecContext* ctx,
- const std::vector<Aggregate>& aggregates, const std::vector<ValueDescr>& in_descrs) {
- std::vector<std::unique_ptr<KernelState>> states(kernels.size());
-
- for (size_t i = 0; i < aggregates.size(); ++i) {
- auto options = aggregates[i].options;
-
- if (options == nullptr) {
- // use known default options for the named function if possible
- auto maybe_function = ctx->func_registry()->GetFunction(aggregates[i].function);
- if (maybe_function.ok()) {
- options = maybe_function.ValueOrDie()->default_options();
- }
- }
-
- KernelContext kernel_ctx{ctx};
- ARROW_ASSIGN_OR_RAISE(
- states[i],
- kernels[i]->init(&kernel_ctx, KernelInitArgs{kernels[i],
- {
- in_descrs[i],
- ValueDescr::Array(uint32()),
- },
- options}));
- }
-
- return std::move(states);
-}
-
-Result<FieldVector> ResolveKernels(
- const std::vector<Aggregate>& aggregates,
- const std::vector<const HashAggregateKernel*>& kernels,
- const std::vector<std::unique_ptr<KernelState>>& states, ExecContext* ctx,
- const std::vector<ValueDescr>& descrs) {
- FieldVector fields(descrs.size());
-
- for (size_t i = 0; i < kernels.size(); ++i) {
- KernelContext kernel_ctx{ctx};
- kernel_ctx.SetState(states[i].get());
-
- ARROW_ASSIGN_OR_RAISE(auto descr, kernels[i]->signature->out_type().Resolve(
- &kernel_ctx, {
- descrs[i],
- ValueDescr::Array(uint32()),
- }));
- fields[i] = field(aggregates[i].function, std::move(descr.type));
- }
- return fields;
-}
-
-Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<ValueDescr>& descrs,
- ExecContext* ctx) {
- if (GrouperFastImpl::CanUse(descrs)) {
- return GrouperFastImpl::Make(descrs, ctx);
- }
- return GrouperImpl::Make(descrs, ctx);
-}
-
-Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
- const std::vector<Aggregate>& aggregates, bool use_threads,
- ExecContext* ctx) {
- auto task_group =
- use_threads
- ? arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool())
- : arrow::internal::TaskGroup::MakeSerial();
-
- std::vector<const HashAggregateKernel*> kernels;
- std::vector<std::vector<std::unique_ptr<KernelState>>> states;
- FieldVector out_fields;
-
- using arrow::compute::detail::ExecBatchIterator;
- std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
-
- if (!arguments.empty()) {
- ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
-
- // Construct and initialize HashAggregateKernels
- auto argument_descrs = args_batch.GetDescriptors();
-
- ARROW_ASSIGN_OR_RAISE(kernels, GetKernels(ctx, aggregates, argument_descrs));
-
- states.resize(task_group->parallelism());
- for (auto& state : states) {
- ARROW_ASSIGN_OR_RAISE(state,
- InitKernels(kernels, ctx, aggregates, argument_descrs));
- }
-
- ARROW_ASSIGN_OR_RAISE(
- out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_descrs));
-
- ARROW_ASSIGN_OR_RAISE(
- argument_batch_iterator,
- ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
- }
-
- // Construct Groupers
- ARROW_ASSIGN_OR_RAISE(ExecBatch keys_batch, ExecBatch::Make(keys));
- auto key_descrs = keys_batch.GetDescriptors();
-
- std::vector<std::unique_ptr<Grouper>> groupers(task_group->parallelism());
- for (auto& grouper : groupers) {
- ARROW_ASSIGN_OR_RAISE(grouper, Grouper::Make(key_descrs, ctx));
- }
-
- std::mutex mutex;
- std::unordered_map<std::thread::id, size_t> thread_ids;
-
- int i = 0;
- for (ValueDescr& key_descr : key_descrs) {
- out_fields.push_back(field("key_" + std::to_string(i++), std::move(key_descr.type)));
- }
-
- ARROW_ASSIGN_OR_RAISE(
- auto key_batch_iterator,
- ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
-
- // start "streaming" execution
- ExecBatch key_batch, argument_batch;
- while ((argument_batch_iterator == NULLPTR ||
- argument_batch_iterator->Next(&argument_batch)) &&
- key_batch_iterator->Next(&key_batch)) {
- if (key_batch.length == 0) continue;
-
- task_group->Append([&, key_batch, argument_batch] {
- size_t thread_index;
- {
- std::unique_lock<std::mutex> lock(mutex);
- auto it = thread_ids.emplace(std::this_thread::get_id(), thread_ids.size()).first;
- thread_index = it->second;
- DCHECK_LT(static_cast<int>(thread_index), task_group->parallelism());
- }
-
- auto grouper = groupers[thread_index].get();
-
- // compute a batch of group ids
- ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
-
- // consume group ids with HashAggregateKernels
- for (size_t i = 0; i < kernels.size(); ++i) {
- KernelContext batch_ctx{ctx};
- batch_ctx.SetState(states[thread_index][i].get());
- ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
- RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
- RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
- }
-
- return Status::OK();
- });
- }
-
- RETURN_NOT_OK(task_group->Finish());
-
- // Merge if necessary
- for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
- ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
- ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
- groupers[thread_index].reset();
-
- for (size_t idx = 0; idx < kernels.size(); ++idx) {
- KernelContext batch_ctx{ctx};
- batch_ctx.SetState(states[0][idx].get());
-
- RETURN_NOT_OK(kernels[idx]->resize(&batch_ctx, groupers[0]->num_groups()));
- RETURN_NOT_OK(kernels[idx]->merge(&batch_ctx, std::move(*states[thread_index][idx]),
- *transposition.array()));
- states[thread_index][idx].reset();
- }
- }
-
- // Finalize output
- ArrayDataVector out_data(arguments.size() + keys.size());
- auto it = out_data.begin();
-
- for (size_t idx = 0; idx < kernels.size(); ++idx) {
- KernelContext batch_ctx{ctx};
- batch_ctx.SetState(states[0][idx].get());
- Datum out;
- RETURN_NOT_OK(kernels[idx]->finalize(&batch_ctx, &out));
- *it++ = out.array();
- }
-
- ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, groupers[0]->GetUniques());
- for (const auto& key : out_keys.values) {
- *it++ = key.array();
- }
-
- int64_t length = out_data[0]->length;
- return ArrayData::Make(struct_(std::move(out_fields)), length,
- {/*null_bitmap=*/nullptr}, std::move(out_data),
- /*null_count=*/0);
-}
-
-Result<std::shared_ptr<ListArray>> Grouper::ApplyGroupings(const ListArray& groupings,
- const Array& array,
- ExecContext* ctx) {
- ARROW_ASSIGN_OR_RAISE(Datum sorted,
- compute::Take(array, groupings.data()->child_data[0],
- TakeOptions::NoBoundsCheck(), ctx));
-
- return std::make_shared<ListArray>(list(array.type()), groupings.length(),
- groupings.value_offsets(), sorted.make_array());
-}
-
-Result<std::shared_ptr<ListArray>> Grouper::MakeGroupings(const UInt32Array& ids,
- uint32_t num_groups,
- ExecContext* ctx) {
- if (ids.null_count() != 0) {
- return Status::Invalid("MakeGroupings with null ids");
- }
-
- ARROW_ASSIGN_OR_RAISE(auto offsets, AllocateBuffer(sizeof(int32_t) * (num_groups + 1),
- ctx->memory_pool()));
- auto raw_offsets = reinterpret_cast<int32_t*>(offsets->mutable_data());
-
- std::memset(raw_offsets, 0, offsets->size());
- for (int i = 0; i < ids.length(); ++i) {
- DCHECK_LT(ids.Value(i), num_groups);
- raw_offsets[ids.Value(i)] += 1;
- }
- int32_t length = 0;
- for (uint32_t id = 0; id < num_groups; ++id) {
- auto offset = raw_offsets[id];
- raw_offsets[id] = length;
- length += offset;
- }
- raw_offsets[num_groups] = length;
- DCHECK_EQ(ids.length(), length);
-
- ARROW_ASSIGN_OR_RAISE(auto offsets_copy,
- offsets->CopySlice(0, offsets->size(), ctx->memory_pool()));
- raw_offsets = reinterpret_cast<int32_t*>(offsets_copy->mutable_data());
-
- ARROW_ASSIGN_OR_RAISE(auto sort_indices, AllocateBuffer(sizeof(int32_t) * ids.length(),
- ctx->memory_pool()));
- auto raw_sort_indices = reinterpret_cast<int32_t*>(sort_indices->mutable_data());
- for (int i = 0; i < ids.length(); ++i) {
- raw_sort_indices[raw_offsets[ids.Value(i)]++] = i;
- }
-
- return std::make_shared<ListArray>(
- list(int32()), num_groups, std::move(offsets),
- std::make_shared<Int32Array>(ids.length(), std::move(sort_indices)));
-}
-
namespace {
const FunctionDoc hash_count_doc{
"Count the number of null / non-null values in each group",
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
index 932bda4891..2ed10b9b2b 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
@@ -33,6 +33,7 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/aggregate.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/test_util.h"
@@ -41,6 +42,7 @@
#include "arrow/compute/kernels/codegen_internal.h"
#include "arrow/compute/kernels/test_util.h"
#include "arrow/compute/registry.h"
+#include "arrow/compute/row/grouper.h"
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
@@ -73,14 +75,13 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
const std::vector<internal::Aggregate>& aggregates) {
ARROW_ASSIGN_OR_RAISE(auto key_batch, ExecBatch::Make(std::move(keys)));
- ARROW_ASSIGN_OR_RAISE(auto grouper,
- internal::Grouper::Make(key_batch.GetDescriptors()));
+ ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_batch.GetDescriptors()));
ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
ARROW_ASSIGN_OR_RAISE(
- auto groupings, internal::Grouper::MakeGroupings(*id_batch.array_as<UInt32Array>(),
- grouper->num_groups()));
+ auto groupings,
+ Grouper::MakeGroupings(*id_batch.array_as<UInt32Array>(), grouper->num_groups()));
ArrayVector out_columns;
std::vector<std::string> out_names;
@@ -93,7 +94,7 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
ARROW_ASSIGN_OR_RAISE(
auto grouped_argument,
- internal::Grouper::ApplyGroupings(*groupings, *arguments[i].make_array()));
+ Grouper::ApplyGroupings(*groupings, *arguments[i].make_array()));
ScalarVector aggregated_scalars;
@@ -261,21 +262,21 @@ Result<Datum> GroupByTest(
} // namespace
TEST(Grouper, SupportedKeys) {
- ASSERT_OK(internal::Grouper::Make({boolean()}));
+ ASSERT_OK(Grouper::Make({boolean()}));
- ASSERT_OK(internal::Grouper::Make({int8(), uint16(), int32(), uint64()}));
+ ASSERT_OK(Grouper::Make({int8(), uint16(), int32(), uint64()}));
- ASSERT_OK(internal::Grouper::Make({dictionary(int64(), utf8())}));
+ ASSERT_OK(Grouper::Make({dictionary(int64(), utf8())}));
- ASSERT_OK(internal::Grouper::Make({float16(), float32(), float64()}));
+ ASSERT_OK(Grouper::Make({float16(), float32(), float64()}));
- ASSERT_OK(internal::Grouper::Make({utf8(), binary(), large_utf8(), large_binary()}));
+ ASSERT_OK(Grouper::Make({utf8(), binary(), large_utf8(), large_binary()}));
- ASSERT_OK(internal::Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)}));
+ ASSERT_OK(Grouper::Make({fixed_size_binary(16), fixed_size_binary(32)}));
- ASSERT_OK(internal::Grouper::Make({decimal128(32, 10), decimal256(76, 20)}));
+ ASSERT_OK(Grouper::Make({decimal128(32, 10), decimal256(76, 20)}));
- ASSERT_OK(internal::Grouper::Make({date32(), date64()}));
+ ASSERT_OK(Grouper::Make({date32(), date64()}));
for (auto unit : {
TimeUnit::SECOND,
@@ -283,29 +284,28 @@ TEST(Grouper, SupportedKeys) {
TimeUnit::MICRO,
TimeUnit::NANO,
}) {
- ASSERT_OK(internal::Grouper::Make({timestamp(unit), duration(unit)}));
+ ASSERT_OK(Grouper::Make({timestamp(unit), duration(unit)}));
}
- ASSERT_OK(internal::Grouper::Make(
- {day_time_interval(), month_interval(), month_day_nano_interval()}));
+ ASSERT_OK(
+ Grouper::Make({day_time_interval(), month_interval(), month_day_nano_interval()}));
- ASSERT_OK(internal::Grouper::Make({null()}));
+ ASSERT_OK(Grouper::Make({null()}));
- ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({field("", int64())})}));
+ ASSERT_RAISES(NotImplemented, Grouper::Make({struct_({field("", int64())})}));
- ASSERT_RAISES(NotImplemented, internal::Grouper::Make({struct_({})}));
+ ASSERT_RAISES(NotImplemented, Grouper::Make({struct_({})}));
- ASSERT_RAISES(NotImplemented, internal::Grouper::Make({list(int32())}));
+ ASSERT_RAISES(NotImplemented, Grouper::Make({list(int32())}));
- ASSERT_RAISES(NotImplemented, internal::Grouper::Make({fixed_size_list(int32(), 5)}));
+ ASSERT_RAISES(NotImplemented, Grouper::Make({fixed_size_list(int32(), 5)}));
- ASSERT_RAISES(NotImplemented,
- internal::Grouper::Make({dense_union({field("", int32())})}));
+ ASSERT_RAISES(NotImplemented, Grouper::Make({dense_union({field("", int32())})}));
}
struct TestGrouper {
explicit TestGrouper(std::vector<ValueDescr> descrs) : descrs_(std::move(descrs)) {
- grouper_ = internal::Grouper::Make(descrs_).ValueOrDie();
+ grouper_ = Grouper::Make(descrs_).ValueOrDie();
FieldVector fields;
for (const auto& descr : descrs_) {
@@ -423,7 +423,7 @@ struct TestGrouper {
std::vector<ValueDescr> descrs_;
std::shared_ptr<Schema> key_schema_;
- std::unique_ptr<internal::Grouper> grouper_;
+ std::unique_ptr<Grouper> grouper_;
ExecBatch uniques_ = ExecBatch({}, -1);
};
@@ -666,12 +666,11 @@ TEST(Grouper, MakeGroupings) {
auto expected = ArrayFromJSON(list(int32()), expected_json);
auto num_groups = static_cast<uint32_t>(expected->length());
- ASSERT_OK_AND_ASSIGN(auto actual, internal::Grouper::MakeGroupings(*ids, num_groups));
+ ASSERT_OK_AND_ASSIGN(auto actual, Grouper::MakeGroupings(*ids, num_groups));
AssertArraysEqual(*expected, *actual, /*verbose=*/true);
// validate ApplyGroupings
- ASSERT_OK_AND_ASSIGN(auto grouped_ids,
- internal::Grouper::ApplyGroupings(*actual, *ids));
+ ASSERT_OK_AND_ASSIGN(auto grouped_ids, Grouper::ApplyGroupings(*actual, *ids));
for (uint32_t group = 0; group < num_groups; ++group) {
auto ids_slice = checked_pointer_cast<UInt32Array>(grouped_ids->value_slice(group));
@@ -693,7 +692,7 @@ TEST(Grouper, MakeGroupings) {
auto ids = checked_pointer_cast<UInt32Array>(ArrayFromJSON(uint32(), "[0, null, 1]"));
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("MakeGroupings with null ids"),
- internal::Grouper::MakeGroupings(*ids, 5));
+ Grouper::MakeGroupings(*ids, 5));
}
TEST(Grouper, ScalarValues) {
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
index dd13aa0647..f0e5c70687 100644
--- a/cpp/src/arrow/compute/light_array.h
+++ b/cpp/src/arrow/compute/light_array.h
@@ -21,7 +21,9 @@
#include "arrow/array.h"
#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
#include "arrow/type.h"
+#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
/// This file contains lightweight containers for Arrow buffers. These containers
@@ -31,6 +33,18 @@
namespace arrow {
namespace compute {
+/// \brief Context needed by various execution engine operations
+///
+/// In the execution engine this context is provided by either the node or the
+/// plan and the context exists for the lifetime of the plan. Defining this here
+/// allows us to take advantage of these resources without coupling the logic with
+/// the execution engine.
+struct LightContext {
+ bool has_avx2() const { return (hardware_flags & arrow::internal::CpuInfo::AVX2) > 0; }
+ int64_t hardware_flags;
+ util::TempVectorStack* stack;
+};
+
/// \brief Description of the layout of a "key" column
///
/// A "key" column is a non-nested, non-union column.
diff --git a/cpp/src/arrow/compute/row/CMakeLists.txt b/cpp/src/arrow/compute/row/CMakeLists.txt
new file mode 100644
index 0000000000..6ae982dbaf
--- /dev/null
+++ b/cpp/src/arrow/compute/row/CMakeLists.txt
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Contains utilities for working with Arrow data been stored
+# in a row-major order.
+
+arrow_install_all_headers("arrow/compute/row")
diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/row/compare_internal.cc
similarity index 90%
rename from cpp/src/arrow/compute/exec/key_compare.cc
rename to cpp/src/arrow/compute/row/compare_internal.cc
index d873aec692..e863c9cd05 100644
--- a/cpp/src/arrow/compute/exec/key_compare.cc
+++ b/cpp/src/arrow/compute/row/compare_internal.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "arrow/compute/exec/key_compare.h"
+#include "arrow/compute/row/compare_internal.h"
#include <memory.h>
@@ -33,9 +33,8 @@ template <bool use_selection>
void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
uint8_t* match_bytevector) {
if (!rows.has_any_nulls(ctx) && !col.data(0)) {
return;
@@ -90,9 +89,8 @@ template <bool use_selection, class COMPARE_FN>
void KeyCompare::CompareBinaryColumnToRowHelper(
uint32_t offset_within_row, uint32_t first_row_to_compare,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector, COMPARE_FN compare_fn) {
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn) {
bool is_fixed_length = rows.metadata().is_fixed_length;
if (is_fixed_length) {
uint32_t fixed_length = rows.metadata().fixed_length;
@@ -118,11 +116,13 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
}
template <bool use_selection>
-void KeyCompare::CompareBinaryColumnToRow(
- uint32_t offset_within_row, uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+void KeyCompare::CompareBinaryColumnToRow(uint32_t offset_within_row,
+ uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_AVX2)
if (ctx->has_avx2()) {
@@ -228,11 +228,13 @@ void KeyCompare::CompareBinaryColumnToRow(
// Overwrites the match_bytevector instead of updating it
template <bool use_selection, bool is_first_varbinary_col>
-void KeyCompare::CompareVarBinaryColumnToRow(
- uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+void KeyCompare::CompareVarBinaryColumnToRow(uint32_t id_varbinary_col,
+ uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
#if defined(ARROW_HAVE_AVX2)
if (ctx->has_avx2()) {
CompareVarBinaryColumnToRow_avx2(
@@ -290,7 +292,7 @@ void KeyCompare::CompareVarBinaryColumnToRow(
}
}
-void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
+void KeyCompare::AndByteVectors(LightContext* ctx, uint32_t num_elements,
uint8_t* bytevector_A, const uint8_t* bytevector_B) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_AVX2)
@@ -306,11 +308,13 @@ void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num
}
}
-void KeyCompare::CompareColumnsToRows(
- uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
- const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows) {
+void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, uint32_t* out_num_rows,
+ uint16_t* out_sel_left_maybe_same,
+ const std::vector<KeyColumnArray>& cols,
+ const RowTableImpl& rows) {
if (num_rows_to_compare == 0) {
*out_num_rows = 0;
return;
diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/row/compare_internal.h
similarity index 52%
rename from cpp/src/arrow/compute/exec/key_compare.h
rename to cpp/src/arrow/compute/row/compare_internal.h
index 773b32d46c..e3b9057115 100644
--- a/cpp/src/arrow/compute/exec/key_compare.h
+++ b/cpp/src/arrow/compute/row/compare_internal.h
@@ -19,8 +19,10 @@
#include <cstdint>
-#include "arrow/compute/exec/key_encode.h"
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/encode_internal.h"
+#include "arrow/compute/row/row_internal.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/status.h"
@@ -33,45 +35,48 @@ class KeyCompare {
// Returns a single 16-bit selection vector of rows that failed comparison.
// If there is input selection on the left, the resulting selection is a filtered image
// of input selection.
- static void CompareColumnsToRows(
- uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
- const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows);
+ static void CompareColumnsToRows(uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map, LightContext* ctx,
+ uint32_t* out_num_rows,
+ uint16_t* out_sel_left_maybe_same,
+ const std::vector<KeyColumnArray>& cols,
+ const RowTableImpl& rows);
private:
template <bool use_selection>
static void NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows,
+ const uint32_t* left_to_right_map, LightContext* ctx,
+ const KeyColumnArray& col, const RowTableImpl& rows,
uint8_t* match_bytevector);
template <bool use_selection, class COMPARE_FN>
static void CompareBinaryColumnToRowHelper(
uint32_t offset_within_row, uint32_t first_row_to_compare,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector, COMPARE_FN compare_fn);
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector, COMPARE_FN compare_fn);
template <bool use_selection>
- static void CompareBinaryColumnToRow(
- uint32_t offset_within_row, uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+ static void CompareBinaryColumnToRow(uint32_t offset_within_row,
+ uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
+ uint8_t* match_bytevector);
template <bool use_selection, bool is_first_varbinary_col>
- static void CompareVarBinaryColumnToRow(
- uint32_t id_varlen_col, uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
-
- static void AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
+ static void CompareVarBinaryColumnToRow(uint32_t id_varlen_col,
+ uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
+ uint8_t* match_bytevector);
+
+ static void AndByteVectors(LightContext* ctx, uint32_t num_elements,
uint8_t* bytevector_A, const uint8_t* bytevector_B);
#if defined(ARROW_HAVE_AVX2)
@@ -79,53 +84,52 @@ class KeyCompare {
template <bool use_selection>
static uint32_t NullUpdateColumnToRowImp_avx2(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector);
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector);
template <bool use_selection, class COMPARE8_FN>
static uint32_t CompareBinaryColumnToRowHelper_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
- COMPARE8_FN compare8_fn);
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector, COMPARE8_FN compare8_fn);
template <bool use_selection>
static uint32_t CompareBinaryColumnToRowImp_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector);
template <bool use_selection, bool is_first_varbinary_col>
static void CompareVarBinaryColumnToRowImp_avx2(
uint32_t id_varlen_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector);
static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A,
const uint8_t* bytevector_B);
- static uint32_t NullUpdateColumnToRow_avx2(
- bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+ static uint32_t NullUpdateColumnToRow_avx2(bool use_selection, uint32_t id_col,
+ uint32_t num_rows_to_compare,
+ const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map,
+ LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows,
+ uint8_t* match_bytevector);
static uint32_t CompareBinaryColumnToRow_avx2(
bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector);
static void CompareVarBinaryColumnToRow_avx2(
bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector);
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector);
#endif
};
diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/row/compare_internal_avx2.cc
similarity index 96%
rename from cpp/src/arrow/compute/exec/key_compare_avx2.cc
rename to cpp/src/arrow/compute/row/compare_internal_avx2.cc
index e45486b2eb..818f4c4fe7 100644
--- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc
+++ b/cpp/src/arrow/compute/row/compare_internal_avx2.cc
@@ -17,7 +17,7 @@
#include <immintrin.h>
-#include "arrow/compute/exec/key_compare.h"
+#include "arrow/compute/row/compare_internal.h"
#include "arrow/util/bit_util.h"
namespace arrow {
@@ -39,9 +39,8 @@ inline __m256i set_first_n_bytes_avx2(int n) {
template <bool use_selection>
uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector) {
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector) {
if (!rows.has_any_nulls(ctx) && !col.data(0)) {
return num_rows_to_compare;
}
@@ -180,9 +179,8 @@ template <bool use_selection, class COMPARE8_FN>
uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
- COMPARE8_FN compare8_fn) {
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector, COMPARE8_FN compare8_fn) {
bool is_fixed_length = rows.metadata().is_fixed_length;
if (is_fixed_length) {
uint32_t fixed_length = rows.metadata().fixed_length;
@@ -419,8 +417,8 @@ template <bool use_selection>
uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
uint32_t col_width = col.metadata().fixed_length;
if (col_width == 0) {
int bit_offset = col.bit_offset(1);
@@ -503,8 +501,8 @@ template <bool use_selection, bool is_first_varbinary_col>
void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
const uint32_t* offsets_left = col.offsets();
const uint32_t* offsets_right = rows.offsets();
const uint8_t* rows_left = col.data(2);
@@ -569,8 +567,8 @@ uint32_t KeyCompare::AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevec
uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
if (use_selection) {
return NullUpdateColumnToRowImp_avx2<true>(id_col, num_rows_to_compare,
sel_left_maybe_null, left_to_right_map,
@@ -585,8 +583,8 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
- const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
+ LightContext* ctx, const KeyColumnArray& col, const RowTableImpl& rows,
+ uint8_t* match_bytevector) {
if (use_selection) {
return CompareBinaryColumnToRowImp_avx2<true>(offset_within_row, num_rows_to_compare,
sel_left_maybe_null, left_to_right_map,
@@ -601,9 +599,8 @@ uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
void KeyCompare::CompareVarBinaryColumnToRow_avx2(
bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
- uint8_t* match_bytevector) {
+ const uint32_t* left_to_right_map, LightContext* ctx, const KeyColumnArray& col,
+ const RowTableImpl& rows, uint8_t* match_bytevector) {
if (use_selection) {
if (is_first_varbinary_col) {
CompareVarBinaryColumnToRowImp_avx2<true, true>(
diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/row/encode_internal.cc
similarity index 56%
rename from cpp/src/arrow/compute/exec/key_encode.cc
rename to cpp/src/arrow/compute/row/encode_internal.cc
index 3d92c77b09..cbfd169b44 100644
--- a/cpp/src/arrow/compute/exec/key_encode.cc
+++ b/cpp/src/arrow/compute/row/encode_internal.cc
@@ -15,318 +15,223 @@
// specific language governing permissions and limitations
// under the License.
-#include "arrow/compute/exec/key_encode.h"
-
-#include <memory.h>
-
-#include <algorithm>
-
-#include "arrow/compute/exec/util.h"
-#include "arrow/util/bit_util.h"
-#include "arrow/util/ubsan.h"
+#include "arrow/compute/row/encode_internal.h"
namespace arrow {
namespace compute {
-KeyEncoder::KeyRowArray::KeyRowArray()
- : pool_(nullptr), rows_capacity_(0), bytes_capacity_(0) {}
-
-Status KeyEncoder::KeyRowArray::Init(MemoryPool* pool, const KeyRowMetadata& metadata) {
- pool_ = pool;
- metadata_ = metadata;
-
- DCHECK(!null_masks_ && !offsets_ && !rows_);
-
- constexpr int64_t rows_capacity = 8;
- constexpr int64_t bytes_capacity = 1024;
-
- // Null masks
- ARROW_ASSIGN_OR_RAISE(auto null_masks,
- AllocateResizableBuffer(size_null_masks(rows_capacity), pool_));
- null_masks_ = std::move(null_masks);
- memset(null_masks_->mutable_data(), 0, size_null_masks(rows_capacity));
-
- // Offsets and rows
- if (!metadata.is_fixed_length) {
- ARROW_ASSIGN_OR_RAISE(auto offsets,
- AllocateResizableBuffer(size_offsets(rows_capacity), pool_));
- offsets_ = std::move(offsets);
- memset(offsets_->mutable_data(), 0, size_offsets(rows_capacity));
- reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
-
- ARROW_ASSIGN_OR_RAISE(
- auto rows,
- AllocateResizableBuffer(size_rows_varying_length(bytes_capacity), pool_));
- rows_ = std::move(rows);
- memset(rows_->mutable_data(), 0, size_rows_varying_length(bytes_capacity));
- bytes_capacity_ = size_rows_varying_length(bytes_capacity) - padding_for_vectors;
- } else {
- ARROW_ASSIGN_OR_RAISE(
- auto rows, AllocateResizableBuffer(size_rows_fixed_length(rows_capacity), pool_));
- rows_ = std::move(rows);
- memset(rows_->mutable_data(), 0, size_rows_fixed_length(rows_capacity));
- bytes_capacity_ = size_rows_fixed_length(rows_capacity) - padding_for_vectors;
- }
-
- update_buffer_pointers();
-
- rows_capacity_ = rows_capacity;
-
- num_rows_ = 0;
- num_rows_for_has_any_nulls_ = 0;
- has_any_nulls_ = false;
-
- return Status::OK();
-}
-
-void KeyEncoder::KeyRowArray::Clean() {
- num_rows_ = 0;
- num_rows_for_has_any_nulls_ = 0;
- has_any_nulls_ = false;
-
- if (!metadata_.is_fixed_length) {
- reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
- }
-}
-
-int64_t KeyEncoder::KeyRowArray::size_null_masks(int64_t num_rows) {
- return num_rows * metadata_.null_masks_bytes_per_row + padding_for_vectors;
-}
-
-int64_t KeyEncoder::KeyRowArray::size_offsets(int64_t num_rows) {
- return (num_rows + 1) * sizeof(uint32_t) + padding_for_vectors;
+void RowTableEncoder::Init(const std::vector<KeyColumnMetadata>& cols, LightContext* ctx,
+ int row_alignment, int string_alignment) {
+ ctx_ = ctx;
+ row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
+ uint32_t num_cols = row_metadata_.num_cols();
+ uint32_t num_varbinary_cols = row_metadata_.num_varbinary_cols();
+ batch_all_cols_.resize(num_cols);
+ batch_varbinary_cols_.resize(num_varbinary_cols);
+ batch_varbinary_cols_base_offsets_.resize(num_varbinary_cols);
}
-int64_t KeyEncoder::KeyRowArray::size_rows_fixed_length(int64_t num_rows) {
- return num_rows * metadata_.fixed_length + padding_for_vectors;
-}
+void RowTableEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+ const std::vector<KeyColumnArray>& cols_in) {
+ const auto num_cols = static_cast<uint32_t>(cols_in.size());
+ DCHECK(batch_all_cols_.size() == num_cols);
-int64_t KeyEncoder::KeyRowArray::size_rows_varying_length(int64_t num_bytes) {
- return num_bytes + padding_for_vectors;
-}
+ uint32_t num_varbinary_visited = 0;
+ for (uint32_t i = 0; i < num_cols; ++i) {
+ const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
+ KeyColumnArray col_window = col.Slice(start_row, num_rows);
-void KeyEncoder::KeyRowArray::update_buffer_pointers() {
- buffers_[0] = mutable_buffers_[0] = null_masks_->mutable_data();
- if (metadata_.is_fixed_length) {
- buffers_[1] = mutable_buffers_[1] = rows_->mutable_data();
- buffers_[2] = mutable_buffers_[2] = nullptr;
- } else {
- buffers_[1] = mutable_buffers_[1] = offsets_->mutable_data();
- buffers_[2] = mutable_buffers_[2] = rows_->mutable_data();
+ batch_all_cols_[i] = col_window;
+ if (!col.metadata().is_fixed_length) {
+ DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
+ // If start row is zero, then base offset of varbinary column is also zero.
+ if (start_row == 0) {
+ batch_varbinary_cols_base_offsets_[num_varbinary_visited] = 0;
+ } else {
+ batch_varbinary_cols_base_offsets_[num_varbinary_visited] =
+ col.offsets()[start_row];
+ }
+ batch_varbinary_cols_[num_varbinary_visited++] = col_window;
+ }
}
}
-Status KeyEncoder::KeyRowArray::ResizeFixedLengthBuffers(int64_t num_extra_rows) {
- if (rows_capacity_ >= num_rows_ + num_extra_rows) {
- return Status::OK();
- }
-
- int64_t rows_capacity_new = std::max(static_cast<int64_t>(1), 2 * rows_capacity_);
- while (rows_capacity_new < num_rows_ + num_extra_rows) {
- rows_capacity_new *= 2;
- }
+void RowTableEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
+ int64_t start_row_output, int64_t num_rows,
+ const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols) {
+ // Prepare column array vectors
+ PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
- // Null masks
- RETURN_NOT_OK(null_masks_->Resize(size_null_masks(rows_capacity_new), false));
- memset(null_masks_->mutable_data() + size_null_masks(rows_capacity_), 0,
- size_null_masks(rows_capacity_new) - size_null_masks(rows_capacity_));
+ // Create two temp vectors with 16-bit elements
+ auto temp_buffer_holder_A =
+ util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
+ auto temp_buffer_A = KeyColumnArray(
+ KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
+ reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
+ auto temp_buffer_holder_B =
+ util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
+ auto temp_buffer_B = KeyColumnArray(
+ KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
+ reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
- // Either offsets or rows
- if (!metadata_.is_fixed_length) {
- RETURN_NOT_OK(offsets_->Resize(size_offsets(rows_capacity_new), false));
- memset(offsets_->mutable_data() + size_offsets(rows_capacity_), 0,
- size_offsets(rows_capacity_new) - size_offsets(rows_capacity_));
- } else {
- RETURN_NOT_OK(rows_->Resize(size_rows_fixed_length(rows_capacity_new), false));
- memset(rows_->mutable_data() + size_rows_fixed_length(rows_capacity_), 0,
- size_rows_fixed_length(rows_capacity_new) -
- size_rows_fixed_length(rows_capacity_));
- bytes_capacity_ = size_rows_fixed_length(rows_capacity_new) - padding_for_vectors;
+ bool is_row_fixed_length = row_metadata_.is_fixed_length;
+ if (!is_row_fixed_length) {
+ EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input),
+ static_cast<uint32_t>(num_rows), rows, &batch_varbinary_cols_,
+ batch_varbinary_cols_base_offsets_, ctx_);
}
- update_buffer_pointers();
-
- rows_capacity_ = rows_capacity_new;
+ // Process fixed length columns
+ const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
+ for (uint32_t i = 0; i < num_cols;) {
+ if (!batch_all_cols_[i].metadata().is_fixed_length ||
+ batch_all_cols_[i].metadata().is_null_type) {
+ i += 1;
+ continue;
+ }
+ bool can_process_pair =
+ (i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
+ EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
+ batch_all_cols_[i + 1].metadata());
+ if (!can_process_pair) {
+ EncoderBinary::Decode(static_cast<uint32_t>(start_row_input),
+ static_cast<uint32_t>(num_rows),
+ row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
+ ctx_, &temp_buffer_A);
+ i += 1;
+ } else {
+ EncoderBinaryPair::Decode(
+ static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
+ row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
+ &batch_all_cols_[i + 1], ctx_, &temp_buffer_A, &temp_buffer_B);
+ i += 2;
+ }
+ }
- return Status::OK();
+ // Process nulls
+ EncoderNulls::Decode(static_cast<uint32_t>(start_row_input),
+ static_cast<uint32_t>(num_rows), rows, &batch_all_cols_);
}
-Status KeyEncoder::KeyRowArray::ResizeOptionalVaryingLengthBuffer(
- int64_t num_extra_bytes) {
- int64_t num_bytes = offsets()[num_rows_];
- if (bytes_capacity_ >= num_bytes + num_extra_bytes || metadata_.is_fixed_length) {
- return Status::OK();
- }
+void RowTableEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
+ int64_t start_row_output,
+ int64_t num_rows,
+ const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols) {
+ // Prepare column array vectors
+ PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
- int64_t bytes_capacity_new = std::max(static_cast<int64_t>(1), 2 * bytes_capacity_);
- while (bytes_capacity_new < num_bytes + num_extra_bytes) {
- bytes_capacity_new *= 2;
+ bool is_row_fixed_length = row_metadata_.is_fixed_length;
+ if (!is_row_fixed_length) {
+ for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
+ // Memcpy varbinary fields into precomputed in the previous step
+ // positions in the output row buffer.
+ EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
+ static_cast<uint32_t>(num_rows), static_cast<uint32_t>(i),
+ rows, &batch_varbinary_cols_[i], ctx_);
+ }
}
+}
- RETURN_NOT_OK(rows_->Resize(size_rows_varying_length(bytes_capacity_new), false));
- memset(rows_->mutable_data() + size_rows_varying_length(bytes_capacity_), 0,
- size_rows_varying_length(bytes_capacity_new) -
- size_rows_varying_length(bytes_capacity_));
-
- update_buffer_pointers();
+void RowTableEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
+ const std::vector<KeyColumnArray>& cols) {
+ // Prepare column array vectors
+ PrepareKeyColumnArrays(start_row, num_rows, cols);
+}
- bytes_capacity_ = bytes_capacity_new;
+Status RowTableEncoder::EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
+ const uint16_t* selection) {
+ rows->Clean();
+ RETURN_NOT_OK(
+ rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));
- return Status::OK();
-}
+ EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
+ selection);
-Status KeyEncoder::KeyRowArray::AppendSelectionFrom(const KeyRowArray& from,
- uint32_t num_rows_to_append,
- const uint16_t* source_row_ids) {
- DCHECK(metadata_.is_compatible(from.metadata()));
-
- RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
-
- if (!metadata_.is_fixed_length) {
- // Varying-length rows
- auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
- auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
- uint32_t total_length = to_offsets[num_rows_];
- uint32_t total_length_to_append = 0;
- for (uint32_t i = 0; i < num_rows_to_append; ++i) {
- uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
- uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
- total_length_to_append += length;
- to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
- }
+ RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
+ static_cast<uint32_t>(rows->offsets()[num_selected])));
- RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append));
-
- const uint8_t* src = from.rows_->data();
- uint8_t* dst = rows_->mutable_data() + total_length;
- for (uint32_t i = 0; i < num_rows_to_append; ++i) {
- uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
- uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
- auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
- auto dst64 = reinterpret_cast<uint64_t*>(dst);
- for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
- dst64[j] = src64[j];
- }
- dst += length;
- }
- } else {
- // Fixed-length rows
- const uint8_t* src = from.rows_->data();
- uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length;
- for (uint32_t i = 0; i < num_rows_to_append; ++i) {
- uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
- uint32_t length = metadata_.fixed_length;
- auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
- auto dst64 = reinterpret_cast<uint64_t*>(dst);
- for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
- dst64[j] = src64[j];
- }
- dst += length;
+ for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
+ if (batch_all_cols_[icol].metadata().is_fixed_length) {
+ uint32_t offset_within_row = rows->metadata().column_offsets[icol];
+ EncoderBinary::EncodeSelected(offset_within_row, rows, batch_all_cols_[icol],
+ num_selected, selection);
}
}
- // Null masks
- uint32_t byte_length = metadata_.null_masks_bytes_per_row;
- uint64_t dst_byte_offset = num_rows_ * byte_length;
- const uint8_t* src_base = from.null_masks_->data();
- uint8_t* dst_base = null_masks_->mutable_data();
- for (uint32_t i = 0; i < num_rows_to_append; ++i) {
- uint32_t row_id = source_row_ids ? source_row_ids[i] : i;
- int64_t src_byte_offset = row_id * byte_length;
- const uint8_t* src = src_base + src_byte_offset;
- uint8_t* dst = dst_base + dst_byte_offset;
- for (uint32_t ibyte = 0; ibyte < byte_length; ++ibyte) {
- dst[ibyte] = src[ibyte];
- }
- dst_byte_offset += byte_length;
+ EncoderOffsets::EncodeSelected(rows, batch_varbinary_cols_, num_selected, selection);
+
+ for (size_t icol = 0; icol < batch_varbinary_cols_.size(); ++icol) {
+ EncoderVarBinary::EncodeSelected(static_cast<uint32_t>(icol), rows,
+ batch_varbinary_cols_[icol], num_selected,
+ selection);
}
- num_rows_ += num_rows_to_append;
+ EncoderNulls::EncodeSelected(rows, batch_all_cols_, num_selected, selection);
return Status::OK();
}
-Status KeyEncoder::KeyRowArray::AppendEmpty(uint32_t num_rows_to_append,
- uint32_t num_extra_bytes_to_append) {
- RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
- RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append));
- num_rows_ += num_rows_to_append;
- if (metadata_.row_alignment > 1 || metadata_.string_alignment > 1) {
- memset(rows_->mutable_data(), 0, bytes_capacity_);
+namespace {
+struct TransformBoolean {
+ static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp) {
+ // Make sure that the temp buffer is large enough
+ DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
+ temp.metadata().fixed_length >= sizeof(uint8_t));
+ KeyColumnMetadata metadata;
+ metadata.is_fixed_length = true;
+ metadata.fixed_length = sizeof(uint8_t);
+ constexpr int buffer_index = 1;
+ return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
}
- return Status::OK();
-}
-bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const {
- if (has_any_nulls_) {
- return true;
- }
- if (num_rows_for_has_any_nulls_ < num_rows_) {
- auto size_per_row = metadata().null_masks_bytes_per_row;
- has_any_nulls_ = !util::bit_util::are_all_bytes_zero(
- ctx->hardware_flags, null_masks() + size_per_row * num_rows_for_has_any_nulls_,
- static_cast<uint32_t>(size_per_row * (num_rows_ - num_rows_for_has_any_nulls_)));
- num_rows_for_has_any_nulls_ = num_rows_;
- }
- return has_any_nulls_;
-}
+ static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+ LightContext* ctx) {
+ // Make sure that metadata and lengths are compatible.
+ DCHECK(output->metadata().is_fixed_length == input.metadata().is_fixed_length);
+ DCHECK(output->metadata().fixed_length == 0 && input.metadata().fixed_length == 1);
+ DCHECK(output->length() == input.length());
+ constexpr int buffer_index = 1;
+ DCHECK(input.data(buffer_index) != nullptr);
+ DCHECK(output->mutable_data(buffer_index) != nullptr);
-KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(const KeyColumnArray& column,
- const KeyColumnArray& temp) {
- // Make sure that the temp buffer is large enough
- DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
- temp.metadata().fixed_length >= sizeof(uint8_t));
- KeyColumnMetadata metadata;
- metadata.is_fixed_length = true;
- metadata.fixed_length = sizeof(uint8_t);
- constexpr int buffer_index = 1;
- return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
-}
+ util::bit_util::bytes_to_bits(
+ ctx->hardware_flags, static_cast<int>(input.length()), input.data(buffer_index),
+ output->mutable_data(buffer_index), output->bit_offset(buffer_index));
+ }
+};
-void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input,
- KeyColumnArray* output,
- KeyEncoderContext* ctx) {
- // Make sure that metadata and lengths are compatible.
- DCHECK(output->metadata().is_fixed_length == input.metadata().is_fixed_length);
- DCHECK(output->metadata().fixed_length == 0 && input.metadata().fixed_length == 1);
- DCHECK(output->length() == input.length());
- constexpr int buffer_index = 1;
- DCHECK(input.data(buffer_index) != nullptr);
- DCHECK(output->mutable_data(buffer_index) != nullptr);
-
- util::bit_util::bytes_to_bits(
- ctx->hardware_flags, static_cast<int>(input.length()), input.data(buffer_index),
- output->mutable_data(buffer_index), output->bit_offset(buffer_index));
-}
+} // namespace
-bool KeyEncoder::EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) {
+bool EncoderInteger::IsBoolean(const KeyColumnMetadata& metadata) {
return metadata.is_fixed_length && metadata.fixed_length == 0 && !metadata.is_null_type;
}
-bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) {
+bool EncoderInteger::UsesTransform(const KeyColumnArray& column) {
return IsBoolean(column.metadata());
}
-KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(const KeyColumnArray& column,
- const KeyColumnArray& temp) {
+KeyColumnArray EncoderInteger::ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp) {
if (IsBoolean(column.metadata())) {
return TransformBoolean::ArrayReplace(column, temp);
}
return column;
}
-void KeyEncoder::EncoderInteger::PostDecode(const KeyColumnArray& input,
- KeyColumnArray* output,
- KeyEncoderContext* ctx) {
+void EncoderInteger::PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+ LightContext* ctx) {
if (IsBoolean(output->metadata())) {
TransformBoolean::PostDecode(input, output, ctx);
}
}
-void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx, KeyColumnArray* temp) {
+void EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col, LightContext* ctx,
+ KeyColumnArray* temp) {
KeyColumnArray col_prep;
if (UsesTransform(*col)) {
col_prep = ArrayReplace(*col, *temp);
@@ -412,7 +317,111 @@ void KeyEncoder::EncoderInteger::Decode(uint32_t start_row, uint32_t num_rows,
}
}
-bool KeyEncoder::EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
+template <class COPY_FN, class SET_NULL_FN>
+void EncoderBinary::EncodeSelectedImp(uint32_t offset_within_row, RowTableImpl* rows,
+ const KeyColumnArray& col, uint32_t num_selected,
+ const uint16_t* selection, COPY_FN copy_fn,
+ SET_NULL_FN set_null_fn) {
+ bool is_fixed_length = rows->metadata().is_fixed_length;
+ if (is_fixed_length) {
+ uint32_t row_width = rows->metadata().fixed_length;
+ const uint8_t* src_base = col.data(1);
+ uint8_t* dst = rows->mutable_data(1) + offset_within_row;
+ for (uint32_t i = 0; i < num_selected; ++i) {
+ copy_fn(dst, src_base, selection[i]);
+ dst += row_width;
+ }
+ if (col.data(0)) {
+ const uint8_t* non_null_bits = col.data(0);
+ uint8_t* dst = rows->mutable_data(1) + offset_within_row;
+ for (uint32_t i = 0; i < num_selected; ++i) {
+ bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
+ if (is_null) {
+ set_null_fn(dst);
+ }
+ dst += row_width;
+ }
+ }
+ } else {
+ const uint8_t* src_base = col.data(1);
+ uint8_t* dst = rows->mutable_data(2) + offset_within_row;
+ const uint32_t* offsets = rows->offsets();
+ for (uint32_t i = 0; i < num_selected; ++i) {
+ copy_fn(dst + offsets[i], src_base, selection[i]);
+ }
+ if (col.data(0)) {
+ const uint8_t* non_null_bits = col.data(0);
+ uint8_t* dst = rows->mutable_data(2) + offset_within_row;
+ const uint32_t* offsets = rows->offsets();
+ for (uint32_t i = 0; i < num_selected; ++i) {
+ bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
+ if (is_null) {
+ set_null_fn(dst + offsets[i]);
+ }
+ }
+ }
+ }
+}
+
+void EncoderBinary::EncodeSelected(uint32_t offset_within_row, RowTableImpl* rows,
+ const KeyColumnArray& col, uint32_t num_selected,
+ const uint16_t* selection) {
+ if (col.metadata().is_null_type) {
+ return;
+ }
+ uint32_t col_width = col.metadata().fixed_length;
+ if (col_width == 0) {
+ int bit_offset = col.bit_offset(1);
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [bit_offset](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ *dst = bit_util::GetBit(src_base, irow + bit_offset) ? 0xff : 0x00;
+ },
+ [](uint8_t* dst) { *dst = 0xae; });
+ } else if (col_width == 1) {
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ *dst = src_base[irow];
+ },
+ [](uint8_t* dst) { *dst = 0xae; });
+ } else if (col_width == 2) {
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ *reinterpret_cast<uint16_t*>(dst) =
+ reinterpret_cast<const uint16_t*>(src_base)[irow];
+ },
+ [](uint8_t* dst) { *reinterpret_cast<uint16_t*>(dst) = 0xaeae; });
+ } else if (col_width == 4) {
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ *reinterpret_cast<uint32_t*>(dst) =
+ reinterpret_cast<const uint32_t*>(src_base)[irow];
+ },
+ [](uint8_t* dst) {
+ *reinterpret_cast<uint32_t*>(dst) = static_cast<uint32_t>(0xaeaeaeae);
+ });
+ } else if (col_width == 8) {
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ *reinterpret_cast<uint64_t*>(dst) =
+ reinterpret_cast<const uint64_t*>(src_base)[irow];
+ },
+ [](uint8_t* dst) { *reinterpret_cast<uint64_t*>(dst) = 0xaeaeaeaeaeaeaeaeULL; });
+ } else {
+ EncodeSelectedImp(
+ offset_within_row, rows, col, num_selected, selection,
+ [col_width](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
+ memcpy(dst, src_base + col_width * irow, col_width);
+ },
+ [col_width](uint8_t* dst) { memset(dst, 0xae, col_width); });
+ }
+}
+
+bool EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
if (metadata.is_null_type) {
return false;
}
@@ -422,10 +431,9 @@ bool KeyEncoder::EncoderBinary::IsInteger(const KeyColumnMetadata& metadata) {
(size == 0 || size == 1 || size == 2 || size == 4 || size == 8);
}
-void KeyEncoder::EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx, KeyColumnArray* temp) {
+void EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col, LightContext* ctx, KeyColumnArray* temp) {
if (IsInteger(col->metadata())) {
EncoderInteger::Decode(start_row, num_rows, offset_within_row, rows, col, ctx, temp);
} else {
@@ -460,9 +468,9 @@ void KeyEncoder::EncoderBinary::Decode(uint32_t start_row, uint32_t num_rows,
}
template <bool is_row_fixed_length>
-void KeyEncoder::EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col) {
+void EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col) {
DecodeHelper<is_row_fixed_length>(
start_row, num_rows, offset_within_row, &rows, nullptr, col, col,
[](uint8_t* dst, const uint8_t* src, int64_t length) {
@@ -474,11 +482,11 @@ void KeyEncoder::EncoderBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
});
}
-void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col1,
- KeyColumnArray* col2, KeyEncoderContext* ctx,
- KeyColumnArray* temp1, KeyColumnArray* temp2) {
+void EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col1, KeyColumnArray* col2,
+ LightContext* ctx, KeyColumnArray* temp1,
+ KeyColumnArray* temp2) {
DCHECK(CanProcessPair(col1->metadata(), col2->metadata()));
KeyColumnArray col_prep[2];
@@ -516,7 +524,7 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows
#endif
if (num_processed < num_rows) {
using DecodeImp_t = void (*)(uint32_t, uint32_t, uint32_t, uint32_t,
- const KeyRowArray&, KeyColumnArray*, KeyColumnArray*);
+ const RowTableImpl&, KeyColumnArray*, KeyColumnArray*);
static const DecodeImp_t DecodeImp_fn[] = {
DecodeImp<false, uint8_t, uint8_t>, DecodeImp<false, uint16_t, uint8_t>,
DecodeImp<false, uint32_t, uint8_t>, DecodeImp<false, uint64_t, uint8_t>,
@@ -549,12 +557,10 @@ void KeyEncoder::EncoderBinaryPair::Decode(uint32_t start_row, uint32_t num_rows
}
template <bool is_row_fixed_length, typename col1_type, typename col2_type>
-void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip,
- uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows,
- KeyColumnArray* col1,
- KeyColumnArray* col2) {
+void EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row,
+ uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col1,
+ KeyColumnArray* col2) {
DCHECK(rows.length() >= start_row + num_rows);
DCHECK(col1->length() == num_rows && col2->length() == num_rows);
@@ -593,10 +599,11 @@ void KeyEncoder::EncoderBinaryPair::DecodeImp(uint32_t num_rows_to_skip,
}
}
-void KeyEncoder::EncoderOffsets::Decode(
- uint32_t start_row, uint32_t num_rows, const KeyRowArray& rows,
- std::vector<KeyColumnArray>* varbinary_cols,
- const std::vector<uint32_t>& varbinary_cols_base_offset, KeyEncoderContext* ctx) {
+void EncoderOffsets::Decode(uint32_t start_row, uint32_t num_rows,
+ const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* varbinary_cols,
+ const std::vector<uint32_t>& varbinary_cols_base_offset,
+ LightContext* ctx) {
DCHECK(!varbinary_cols->empty());
DCHECK(varbinary_cols->size() == varbinary_cols_base_offset.size());
@@ -635,7 +642,7 @@ void KeyEncoder::EncoderOffsets::Decode(
uint32_t offset_within_row = rows.metadata().fixed_length;
for (size_t col = 0; col < varbinary_cols->size(); ++col) {
offset_within_row +=
- KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment);
+ RowTableMetadata::padding_for_alignment(offset_within_row, string_alignment);
uint32_t length = varbinary_ends[col] - offset_within_row;
offset_within_row = varbinary_ends[col];
uint32_t* col_offsets = (*varbinary_cols)[col].mutable_offsets();
@@ -644,440 +651,10 @@ void KeyEncoder::EncoderOffsets::Decode(
}
}
-void KeyEncoder::EncoderVarBinary::Decode(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id,
- const KeyRowArray& rows, KeyColumnArray* col,
- KeyEncoderContext* ctx) {
- // Output column varbinary buffer needs an extra 32B
- // at the end in avx2 version and 8B otherwise.
-#if defined(ARROW_HAVE_AVX2)
- if (ctx->has_avx2()) {
- DecodeHelper_avx2(start_row, num_rows, varbinary_col_id, rows, col);
- } else {
-#endif
- if (varbinary_col_id == 0) {
- DecodeImp<true>(start_row, num_rows, varbinary_col_id, rows, col);
- } else {
- DecodeImp<false>(start_row, num_rows, varbinary_col_id, rows, col);
- }
-#if defined(ARROW_HAVE_AVX2)
- }
-#endif
-}
-
-template <bool first_varbinary_col>
-void KeyEncoder::EncoderVarBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id,
- const KeyRowArray& rows,
- KeyColumnArray* col) {
- DecodeHelper<first_varbinary_col>(
- start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
- [](uint8_t* dst, const uint8_t* src, int64_t length) {
- for (uint32_t istripe = 0; istripe < bit_util::CeilDiv(length, 8); ++istripe) {
- auto dst64 = reinterpret_cast<uint64_t*>(dst);
- auto src64 = reinterpret_cast<const uint64_t*>(src);
- util::SafeStore(dst64 + istripe, src64[istripe]);
- }
- });
-}
-
-void KeyEncoder::EncoderNulls::Decode(uint32_t start_row, uint32_t num_rows,
- const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols) {
- // Every output column needs to have a space for exactly the required number
- // of rows. It also needs to have non-nulls bit-vector allocated and mutable.
- DCHECK_GT(cols->size(), 0);
- for (auto& col : *cols) {
- DCHECK(col.length() == num_rows);
- DCHECK(col.mutable_data(0) || col.metadata().is_null_type);
- }
-
- const uint8_t* null_masks = rows.null_masks();
- uint32_t null_masks_bytes_per_row = rows.metadata().null_masks_bytes_per_row;
- for (size_t col = 0; col < cols->size(); ++col) {
- if ((*cols)[col].metadata().is_null_type) {
- continue;
- }
- uint8_t* non_nulls = (*cols)[col].mutable_data(0);
- const int bit_offset = (*cols)[col].bit_offset(0);
- DCHECK_LT(bit_offset, 8);
- non_nulls[0] |= 0xff << (bit_offset);
- if (bit_offset + num_rows > 8) {
- int bits_in_first_byte = 8 - bit_offset;
- memset(non_nulls + 1, 0xff, bit_util::BytesForBits(num_rows - bits_in_first_byte));
- }
- for (uint32_t row = 0; row < num_rows; ++row) {
- uint32_t null_masks_bit_id =
- (start_row + row) * null_masks_bytes_per_row * 8 + static_cast<uint32_t>(col);
- bool is_set = bit_util::GetBit(null_masks, null_masks_bit_id);
- if (is_set) {
- bit_util::ClearBit(non_nulls, bit_offset + row);
- }
- }
- }
-}
-
-uint32_t KeyEncoder::KeyRowMetadata::num_varbinary_cols() const {
- uint32_t result = 0;
- for (auto column_metadata : column_metadatas) {
- if (!column_metadata.is_fixed_length) {
- ++result;
- }
- }
- return result;
-}
-
-bool KeyEncoder::KeyRowMetadata::is_compatible(const KeyRowMetadata& other) const {
- if (other.num_cols() != num_cols()) {
- return false;
- }
- if (row_alignment != other.row_alignment ||
- string_alignment != other.string_alignment) {
- return false;
- }
- for (size_t i = 0; i < column_metadatas.size(); ++i) {
- if (column_metadatas[i].is_fixed_length !=
- other.column_metadatas[i].is_fixed_length) {
- return false;
- }
- if (column_metadatas[i].fixed_length != other.column_metadatas[i].fixed_length) {
- return false;
- }
- }
- return true;
-}
-
-void KeyEncoder::KeyRowMetadata::FromColumnMetadataVector(
- const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
- int in_string_alignment) {
- column_metadatas.resize(cols.size());
- for (size_t i = 0; i < cols.size(); ++i) {
- column_metadatas[i] = cols[i];
- }
-
- const auto num_cols = static_cast<uint32_t>(cols.size());
-
- // Sort columns.
- //
- // Columns are sorted based on the size in bytes of their fixed-length part.
- // For the varying-length column, the fixed-length part is the 32-bit field storing
- // cumulative length of varying-length fields.
- //
- // The rules are:
- //
- // a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
- // part of 1 byte.
- //
- // b) Columns with fixed-length part being power of 2 or multiple of row
- // alignment precede other columns. They are sorted in decreasing order of the size of
- // their fixed-length part.
- //
- // c) Fixed-length columns precede varying-length columns when
- // both have the same size fixed-length part.
- //
- column_order.resize(num_cols);
- for (uint32_t i = 0; i < num_cols; ++i) {
- column_order[i] = i;
- }
- std::sort(
- column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
- bool is_left_pow2 =
- !cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
- bool is_right_pow2 = !cols[right].is_fixed_length ||
- ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
- bool is_left_fixedlen = cols[left].is_fixed_length;
- bool is_right_fixedlen = cols[right].is_fixed_length;
- uint32_t width_left =
- cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
- uint32_t width_right =
- cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
- if (is_left_pow2 != is_right_pow2) {
- return is_left_pow2;
- }
- if (!is_left_pow2) {
- return left < right;
- }
- if (width_left != width_right) {
- return width_left > width_right;
- }
- if (is_left_fixedlen != is_right_fixedlen) {
- return is_left_fixedlen;
- }
- return left < right;
- });
-
- row_alignment = in_row_alignment;
- string_alignment = in_string_alignment;
- varbinary_end_array_offset = 0;
-
- column_offsets.resize(num_cols);
- uint32_t num_varbinary_cols = 0;
- uint32_t offset_within_row = 0;
- for (uint32_t i = 0; i < num_cols; ++i) {
- const KeyColumnMetadata& col = cols[column_order[i]];
- if (col.is_fixed_length && col.fixed_length != 0 &&
- ARROW_POPCOUNT64(col.fixed_length) != 1) {
- offset_within_row +=
- KeyRowMetadata::padding_for_alignment(offset_within_row, string_alignment, col);
- }
- column_offsets[i] = offset_within_row;
- if (!col.is_fixed_length) {
- if (num_varbinary_cols == 0) {
- varbinary_end_array_offset = offset_within_row;
- }
- DCHECK(column_offsets[i] - varbinary_end_array_offset ==
- num_varbinary_cols * sizeof(uint32_t));
- ++num_varbinary_cols;
- offset_within_row += sizeof(uint32_t);
- } else {
- // Boolean column is a bit-vector, which is indicated by
- // setting fixed length in column metadata to zero.
- // It will be stored as a byte in output row.
- if (col.fixed_length == 0) {
- offset_within_row += 1;
- } else {
- offset_within_row += col.fixed_length;
- }
- }
- }
-
- is_fixed_length = (num_varbinary_cols == 0);
- fixed_length =
- offset_within_row +
- KeyRowMetadata::padding_for_alignment(
- offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);
-
- // We set the number of bytes per row storing null masks of individual key columns
- // to be a power of two. This is not required. It could be also set to the minimal
- // number of bytes required for a given number of bits (one bit per column).
- null_masks_bytes_per_row = 1;
- while (static_cast<uint32_t>(null_masks_bytes_per_row * 8) < num_cols) {
- null_masks_bytes_per_row *= 2;
- }
-}
-
-void KeyEncoder::Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
- int row_alignment, int string_alignment) {
- ctx_ = ctx;
- row_metadata_.FromColumnMetadataVector(cols, row_alignment, string_alignment);
- uint32_t num_cols = row_metadata_.num_cols();
- uint32_t num_varbinary_cols = row_metadata_.num_varbinary_cols();
- batch_all_cols_.resize(num_cols);
- batch_varbinary_cols_.resize(num_varbinary_cols);
- batch_varbinary_cols_base_offsets_.resize(num_varbinary_cols);
-}
-
-void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
- const std::vector<KeyColumnArray>& cols_in) {
- const auto num_cols = static_cast<uint32_t>(cols_in.size());
- DCHECK(batch_all_cols_.size() == num_cols);
-
- uint32_t num_varbinary_visited = 0;
- for (uint32_t i = 0; i < num_cols; ++i) {
- const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
- KeyColumnArray col_window = col.Slice(start_row, num_rows);
-
- batch_all_cols_[i] = col_window;
- if (!col.metadata().is_fixed_length) {
- DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
- // If start row is zero, then base offset of varbinary column is also zero.
- if (start_row == 0) {
- batch_varbinary_cols_base_offsets_[num_varbinary_visited] = 0;
- } else {
- batch_varbinary_cols_base_offsets_[num_varbinary_visited] =
- col.offsets()[start_row];
- }
- batch_varbinary_cols_[num_varbinary_visited++] = col_window;
- }
- }
-}
-
-void KeyEncoder::DecodeFixedLengthBuffers(int64_t start_row_input,
- int64_t start_row_output, int64_t num_rows,
- const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols) {
- // Prepare column array vectors
- PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
-
- // Create two temp vectors with 16-bit elements
- auto temp_buffer_holder_A =
- util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
- auto temp_buffer_A = KeyColumnArray(
- KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
- reinterpret_cast<uint8_t*>(temp_buffer_holder_A.mutable_data()), nullptr);
- auto temp_buffer_holder_B =
- util::TempVectorHolder<uint16_t>(ctx_->stack, static_cast<uint32_t>(num_rows));
- auto temp_buffer_B = KeyColumnArray(
- KeyColumnMetadata(true, sizeof(uint16_t)), num_rows, nullptr,
- reinterpret_cast<uint8_t*>(temp_buffer_holder_B.mutable_data()), nullptr);
-
- bool is_row_fixed_length = row_metadata_.is_fixed_length;
- if (!is_row_fixed_length) {
- EncoderOffsets::Decode(static_cast<uint32_t>(start_row_input),
- static_cast<uint32_t>(num_rows), rows, &batch_varbinary_cols_,
- batch_varbinary_cols_base_offsets_, ctx_);
- }
-
- // Process fixed length columns
- const auto num_cols = static_cast<uint32_t>(batch_all_cols_.size());
- for (uint32_t i = 0; i < num_cols;) {
- if (!batch_all_cols_[i].metadata().is_fixed_length ||
- batch_all_cols_[i].metadata().is_null_type) {
- i += 1;
- continue;
- }
- bool can_process_pair =
- (i + 1 < num_cols) && batch_all_cols_[i + 1].metadata().is_fixed_length &&
- EncoderBinaryPair::CanProcessPair(batch_all_cols_[i].metadata(),
- batch_all_cols_[i + 1].metadata());
- if (!can_process_pair) {
- EncoderBinary::Decode(static_cast<uint32_t>(start_row_input),
- static_cast<uint32_t>(num_rows),
- row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
- ctx_, &temp_buffer_A);
- i += 1;
- } else {
- EncoderBinaryPair::Decode(
- static_cast<uint32_t>(start_row_input), static_cast<uint32_t>(num_rows),
- row_metadata_.column_offsets[i], rows, &batch_all_cols_[i],
- &batch_all_cols_[i + 1], ctx_, &temp_buffer_A, &temp_buffer_B);
- i += 2;
- }
- }
-
- // Process nulls
- EncoderNulls::Decode(static_cast<uint32_t>(start_row_input),
- static_cast<uint32_t>(num_rows), rows, &batch_all_cols_);
-}
-
-void KeyEncoder::DecodeVaryingLengthBuffers(int64_t start_row_input,
- int64_t start_row_output, int64_t num_rows,
- const KeyRowArray& rows,
- std::vector<KeyColumnArray>* cols) {
- // Prepare column array vectors
- PrepareKeyColumnArrays(start_row_output, num_rows, *cols);
-
- bool is_row_fixed_length = row_metadata_.is_fixed_length;
- if (!is_row_fixed_length) {
- for (size_t i = 0; i < batch_varbinary_cols_.size(); ++i) {
- // Memcpy varbinary fields into precomputed in the previous step
- // positions in the output row buffer.
- EncoderVarBinary::Decode(static_cast<uint32_t>(start_row_input),
- static_cast<uint32_t>(num_rows), static_cast<uint32_t>(i),
- rows, &batch_varbinary_cols_[i], ctx_);
- }
- }
-}
-
-template <class COPY_FN, class SET_NULL_FN>
-void KeyEncoder::EncoderBinary::EncodeSelectedImp(
- uint32_t offset_within_row, KeyRowArray* rows, const KeyColumnArray& col,
- uint32_t num_selected, const uint16_t* selection, COPY_FN copy_fn,
- SET_NULL_FN set_null_fn) {
- bool is_fixed_length = rows->metadata().is_fixed_length;
- if (is_fixed_length) {
- uint32_t row_width = rows->metadata().fixed_length;
- const uint8_t* src_base = col.data(1);
- uint8_t* dst = rows->mutable_data(1) + offset_within_row;
- for (uint32_t i = 0; i < num_selected; ++i) {
- copy_fn(dst, src_base, selection[i]);
- dst += row_width;
- }
- if (col.data(0)) {
- const uint8_t* non_null_bits = col.data(0);
- uint8_t* dst = rows->mutable_data(1) + offset_within_row;
- for (uint32_t i = 0; i < num_selected; ++i) {
- bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
- if (is_null) {
- set_null_fn(dst);
- }
- dst += row_width;
- }
- }
- } else {
- const uint8_t* src_base = col.data(1);
- uint8_t* dst = rows->mutable_data(2) + offset_within_row;
- const uint32_t* offsets = rows->offsets();
- for (uint32_t i = 0; i < num_selected; ++i) {
- copy_fn(dst + offsets[i], src_base, selection[i]);
- }
- if (col.data(0)) {
- const uint8_t* non_null_bits = col.data(0);
- uint8_t* dst = rows->mutable_data(2) + offset_within_row;
- const uint32_t* offsets = rows->offsets();
- for (uint32_t i = 0; i < num_selected; ++i) {
- bool is_null = !bit_util::GetBit(non_null_bits, selection[i] + col.bit_offset(0));
- if (is_null) {
- set_null_fn(dst + offsets[i]);
- }
- }
- }
- }
-}
-
-void KeyEncoder::EncoderBinary::EncodeSelected(uint32_t offset_within_row,
- KeyRowArray* rows,
- const KeyColumnArray& col,
- uint32_t num_selected,
- const uint16_t* selection) {
- if (col.metadata().is_null_type) {
- return;
- }
- uint32_t col_width = col.metadata().fixed_length;
- if (col_width == 0) {
- int bit_offset = col.bit_offset(1);
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [bit_offset](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- *dst = bit_util::GetBit(src_base, irow + bit_offset) ? 0xff : 0x00;
- },
- [](uint8_t* dst) { *dst = 0xae; });
- } else if (col_width == 1) {
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- *dst = src_base[irow];
- },
- [](uint8_t* dst) { *dst = 0xae; });
- } else if (col_width == 2) {
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- *reinterpret_cast<uint16_t*>(dst) =
- reinterpret_cast<const uint16_t*>(src_base)[irow];
- },
- [](uint8_t* dst) { *reinterpret_cast<uint16_t*>(dst) = 0xaeae; });
- } else if (col_width == 4) {
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- *reinterpret_cast<uint32_t*>(dst) =
- reinterpret_cast<const uint32_t*>(src_base)[irow];
- },
- [](uint8_t* dst) {
- *reinterpret_cast<uint32_t*>(dst) = static_cast<uint32_t>(0xaeaeaeae);
- });
- } else if (col_width == 8) {
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- *reinterpret_cast<uint64_t*>(dst) =
- reinterpret_cast<const uint64_t*>(src_base)[irow];
- },
- [](uint8_t* dst) { *reinterpret_cast<uint64_t*>(dst) = 0xaeaeaeaeaeaeaeaeULL; });
- } else {
- EncodeSelectedImp(
- offset_within_row, rows, col, num_selected, selection,
- [col_width](uint8_t* dst, const uint8_t* src_base, uint16_t irow) {
- memcpy(dst, src_base + col_width * irow, col_width);
- },
- [col_width](uint8_t* dst) { memset(dst, 0xae, col_width); });
- }
-}
-
-void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
- KeyRowArray* rows, const std::vector<KeyColumnArray>& cols, uint32_t num_selected,
- const uint16_t* selection) {
+void EncoderOffsets::GetRowOffsetsSelected(RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected,
+ const uint16_t* selection) {
if (rows->metadata().is_fixed_length) {
return;
}
@@ -1094,7 +671,7 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
for (uint32_t i = 0; i < num_selected; ++i) {
uint32_t irow = selection[i];
uint32_t length = col_offsets[irow + 1] - col_offsets[irow];
- row_offsets[i] += KeyRowMetadata::padding_for_alignment(
+ row_offsets[i] += RowTableMetadata::padding_for_alignment(
row_offsets[i], rows->metadata().string_alignment);
row_offsets[i] += length;
}
@@ -1118,7 +695,7 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
int row_alignment = rows->metadata().row_alignment;
for (uint32_t i = 0; i < num_selected; ++i) {
uint32_t length = row_offsets[i];
- length += KeyRowMetadata::padding_for_alignment(length, row_alignment);
+ length += RowTableMetadata::padding_for_alignment(length, row_alignment);
row_offsets[i] = sum;
sum += length;
}
@@ -1126,9 +703,9 @@ void KeyEncoder::EncoderOffsets::GetRowOffsetsSelected(
}
template <bool has_nulls, bool is_first_varbinary>
-void KeyEncoder::EncoderOffsets::EncodeSelectedImp(
- uint32_t ivarbinary, KeyRowArray* rows, const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected, const uint16_t* selection) {
+void EncoderOffsets::EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection) {
const uint32_t* row_offsets = rows->offsets();
uint8_t* row_base = rows->mutable_data(2) +
rows->metadata().varbinary_end_array_offset +
@@ -1150,17 +727,16 @@ void KeyEncoder::EncoderOffsets::EncodeSelectedImp(
row[0] = rows->metadata().fixed_length + length;
} else {
row[0] = row[-1] +
- KeyRowMetadata::padding_for_alignment(row[-1],
- rows->metadata().string_alignment) +
+ RowTableMetadata::padding_for_alignment(
+ row[-1], rows->metadata().string_alignment) +
length;
}
}
}
-void KeyEncoder::EncoderOffsets::EncodeSelected(KeyRowArray* rows,
- const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected,
- const uint16_t* selection) {
+void EncoderOffsets::EncodeSelected(RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection) {
if (rows->metadata().is_fixed_length) {
return;
}
@@ -1182,10 +758,79 @@ void KeyEncoder::EncoderOffsets::EncodeSelected(KeyRowArray* rows,
}
}
-void KeyEncoder::EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, KeyRowArray* rows,
- const KeyColumnArray& cols,
- uint32_t num_selected,
- const uint16_t* selection) {
+void EncoderVarBinary::Decode(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const RowTableImpl& rows,
+ KeyColumnArray* col, LightContext* ctx) {
+ // Output column varbinary buffer needs an extra 32B
+ // at the end in avx2 version and 8B otherwise.
+#if defined(ARROW_HAVE_AVX2)
+ if (ctx->has_avx2()) {
+ DecodeHelper_avx2(start_row, num_rows, varbinary_col_id, rows, col);
+ } else {
+#endif
+ if (varbinary_col_id == 0) {
+ DecodeImp<true>(start_row, num_rows, varbinary_col_id, rows, col);
+ } else {
+ DecodeImp<false>(start_row, num_rows, varbinary_col_id, rows, col);
+ }
+#if defined(ARROW_HAVE_AVX2)
+ }
+#endif
+}
+
+template <bool first_varbinary_col>
+void EncoderVarBinary::DecodeImp(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const RowTableImpl& rows,
+ KeyColumnArray* col) {
+ DecodeHelper<first_varbinary_col>(
+ start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
+ [](uint8_t* dst, const uint8_t* src, int64_t length) {
+ for (uint32_t istripe = 0; istripe < bit_util::CeilDiv(length, 8); ++istripe) {
+ auto dst64 = reinterpret_cast<uint64_t*>(dst);
+ auto src64 = reinterpret_cast<const uint64_t*>(src);
+ util::SafeStore(dst64 + istripe, src64[istripe]);
+ }
+ });
+}
+
+void EncoderNulls::Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols) {
+ // Every output column needs to have a space for exactly the required number
+ // of rows. It also needs to have non-nulls bit-vector allocated and mutable.
+ DCHECK_GT(cols->size(), 0);
+ for (auto& col : *cols) {
+ DCHECK(col.length() == num_rows);
+ DCHECK(col.mutable_data(0) || col.metadata().is_null_type);
+ }
+
+ const uint8_t* null_masks = rows.null_masks();
+ uint32_t null_masks_bytes_per_row = rows.metadata().null_masks_bytes_per_row;
+ for (size_t col = 0; col < cols->size(); ++col) {
+ if ((*cols)[col].metadata().is_null_type) {
+ continue;
+ }
+ uint8_t* non_nulls = (*cols)[col].mutable_data(0);
+ const int bit_offset = (*cols)[col].bit_offset(0);
+ DCHECK_LT(bit_offset, 8);
+ non_nulls[0] |= 0xff << (bit_offset);
+ if (bit_offset + num_rows > 8) {
+ int bits_in_first_byte = 8 - bit_offset;
+ memset(non_nulls + 1, 0xff, bit_util::BytesForBits(num_rows - bits_in_first_byte));
+ }
+ for (uint32_t row = 0; row < num_rows; ++row) {
+ uint32_t null_masks_bit_id =
+ (start_row + row) * null_masks_bytes_per_row * 8 + static_cast<uint32_t>(col);
+ bool is_set = bit_util::GetBit(null_masks, null_masks_bit_id);
+ if (is_set) {
+ bit_util::ClearBit(non_nulls, bit_offset + row);
+ }
+ }
+ }
+}
+
+void EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
+ const KeyColumnArray& cols, uint32_t num_selected,
+ const uint16_t* selection) {
const uint32_t* row_offsets = rows->offsets();
uint8_t* row_base = rows->mutable_data(2);
const uint32_t* col_offsets = cols.offsets();
@@ -1213,10 +858,9 @@ void KeyEncoder::EncoderVarBinary::EncodeSelected(uint32_t ivarbinary, KeyRowArr
}
}
-void KeyEncoder::EncoderNulls::EncodeSelected(KeyRowArray* rows,
- const std::vector<KeyColumnArray>& cols,
- uint32_t num_selected,
- const uint16_t* selection) {
+void EncoderNulls::EncodeSelected(RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection) {
uint8_t* null_masks = rows->null_masks();
uint32_t null_mask_num_bytes = rows->metadata().null_masks_bytes_per_row;
memset(null_masks, 0, null_mask_num_bytes * num_selected);
@@ -1234,44 +878,5 @@ void KeyEncoder::EncoderNulls::EncodeSelected(KeyRowArray* rows,
}
}
-void KeyEncoder::PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
- const std::vector<KeyColumnArray>& cols) {
- // Prepare column array vectors
- PrepareKeyColumnArrays(start_row, num_rows, cols);
-}
-
-Status KeyEncoder::EncodeSelected(KeyRowArray* rows, uint32_t num_selected,
- const uint16_t* selection) {
- rows->Clean();
- RETURN_NOT_OK(
- rows->AppendEmpty(static_cast<uint32_t>(num_selected), static_cast<uint32_t>(0)));
-
- EncoderOffsets::GetRowOffsetsSelected(rows, batch_varbinary_cols_, num_selected,
- selection);
-
- RETURN_NOT_OK(rows->AppendEmpty(static_cast<uint32_t>(0),
- static_cast<uint32_t>(rows->offsets()[num_selected])));
-
- for (size_t icol = 0; icol < batch_all_cols_.size(); ++icol) {
- if (batch_all_cols_[icol].metadata().is_fixed_length) {
- uint32_t offset_within_row = rows->metadata().column_offsets[icol];
- EncoderBinary::EncodeSelected(offset_within_row, rows, batch_all_cols_[icol],
- num_selected, selection);
- }
- }
-
- EncoderOffsets::EncodeSelected(rows, batch_varbinary_cols_, num_selected, selection);
-
- for (size_t icol = 0; icol < batch_varbinary_cols_.size(); ++icol) {
- EncoderVarBinary::EncodeSelected(static_cast<uint32_t>(icol), rows,
- batch_varbinary_cols_[icol], num_selected,
- selection);
- }
-
- EncoderNulls::EncodeSelected(rows, batch_all_cols_, num_selected, selection);
-
- return Status::OK();
-}
-
} // namespace compute
} // namespace arrow
diff --git a/cpp/src/arrow/compute/row/encode_internal.h b/cpp/src/arrow/compute/row/encode_internal.h
new file mode 100644
index 0000000000..ce88731346
--- /dev/null
+++ b/cpp/src/arrow/compute/row/encode_internal.h
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array/data.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/row/row_internal.h"
+#include "arrow/memory_pool.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/bit_util.h"
+
+namespace arrow {
+namespace compute {
+
+/// Converts between Arrow's typical column representation to a row-based representation
+///
+/// Data is stored as a single array of rows. Each row combines data from all columns.
+/// The conversion is reversible.
+///
+/// Row-oriented storage is beneficial when there is a need for random access
+/// of individual rows and at the same time all included columns are likely to
+/// be accessed together, as in the case of hash table key.
+///
+/// Does not support nested types
+class RowTableEncoder {
+ public:
+ void Init(const std::vector<KeyColumnMetadata>& cols, LightContext* ctx,
+ int row_alignment, int string_alignment);
+
+ const RowTableMetadata& row_metadata() { return row_metadata_; }
+ // GrouperFastImpl right now needs somewhat intrusive visibility into RowTableEncoder
+ // This could be cleaned up at some point
+ const std::vector<KeyColumnArray>& batch_all_cols() { return batch_all_cols_; }
+
+ /// \brief Prepare to encode a collection of columns
+ /// \param start_row The starting row to encode
+ /// \param num_rows The number of rows to encode
+ /// \param cols The columns to encode. The order of the columns should
+ /// be consistent with the order used to create the RowTableMetadata
+ void PrepareEncodeSelected(int64_t start_row, int64_t num_rows,
+ const std::vector<KeyColumnArray>& cols);
+ /// \brief Encode selection of prepared rows into a row table
+ /// \param rows The output row table
+ /// \param num_selected The number of rows to encode
+ /// \param selection indices of the rows to encode
+ Status EncodeSelected(RowTableImpl* rows, uint32_t num_selected,
+ const uint16_t* selection);
+
+ /// \brief Decode a window of row oriented data into a corresponding
+ /// window of column oriented storage.
+ /// \param start_row_input The starting row to decode
+ /// \param start_row_output An offset into the output array to write to
+ /// \param num_rows The number of rows to decode
+ /// \param rows The row table to decode from
+ /// \param cols The columns to decode into, should be sized appropriately
+ ///
+ /// The output buffers need to be correctly allocated and sized before
+ /// calling each method. For that reason decoding is split into two functions.
+ /// DecodeFixedLengthBuffers processes everything except for varying length
+ /// buffers.
+ /// The output can be used to find out required varying length buffers sizes
+ /// for the call to DecodeVaryingLengthBuffers
+ void DecodeFixedLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+ int64_t num_rows, const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols);
+
+ /// \brief Decode the varlength columns of a row table into column storage
+ /// \param start_row_input The starting row to decode
+ /// \param start_row_output An offset into the output arrays
+ /// \param num_rows The number of rows to decode
+ /// \param rows The row table to decode from
+ /// \param cols The column arrays to decode into
+ void DecodeVaryingLengthBuffers(int64_t start_row_input, int64_t start_row_output,
+ int64_t num_rows, const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols);
+
+ private:
+ /// Prepare column array vectors.
+ /// Output column arrays represent a range of input column arrays
+ /// specified by starting row and number of rows.
+ /// Three vectors are generated:
+ /// - all columns
+ /// - fixed-length columns only
+ /// - varying-length columns only
+ void PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
+ const std::vector<KeyColumnArray>& cols_in);
+
+ LightContext* ctx_;
+
+ // Data initialized once, based on data types of key columns
+ RowTableMetadata row_metadata_;
+
+ // Data initialized for each input batch.
+ // All elements are ordered according to the order of encoded fields in a row.
+ std::vector<KeyColumnArray> batch_all_cols_;
+ std::vector<KeyColumnArray> batch_varbinary_cols_;
+ std::vector<uint32_t> batch_varbinary_cols_base_offsets_;
+};
+
+class EncoderInteger {
+ public:
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx,
+ KeyColumnArray* temp);
+ static bool UsesTransform(const KeyColumnArray& column);
+ static KeyColumnArray ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp);
+ static void PostDecode(const KeyColumnArray& input, KeyColumnArray* output,
+ LightContext* ctx);
+
+ private:
+ static bool IsBoolean(const KeyColumnMetadata& metadata);
+};
+
+class EncoderBinary {
+ public:
+ static void EncodeSelected(uint32_t offset_within_row, RowTableImpl* rows,
+ const KeyColumnArray& col, uint32_t num_selected,
+ const uint16_t* selection);
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx,
+ KeyColumnArray* temp);
+ static bool IsInteger(const KeyColumnMetadata& metadata);
+
+ private:
+ template <class COPY_FN, class SET_NULL_FN>
+ static void EncodeSelectedImp(uint32_t offset_within_row, RowTableImpl* rows,
+ const KeyColumnArray& col, uint32_t num_selected,
+ const uint16_t* selection, COPY_FN copy_fn,
+ SET_NULL_FN set_null_fn);
+
+ template <bool is_row_fixed_length, class COPY_FN>
+ static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row,
+ const RowTableImpl* rows_const,
+ RowTableImpl* rows_mutable_maybe_null,
+ const KeyColumnArray* col_const,
+ KeyColumnArray* col_mutable_maybe_null,
+ COPY_FN copy_fn) {
+ ARROW_DCHECK(col_const && col_const->metadata().is_fixed_length);
+ uint32_t col_width = col_const->metadata().fixed_length;
+
+ if (is_row_fixed_length) {
+ uint32_t row_width = rows_const->metadata().fixed_length;
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ const uint8_t* src;
+ uint8_t* dst;
+ src = rows_const->data(1) + row_width * (start_row + i) + offset_within_row;
+ dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+ copy_fn(dst, src, col_width);
+ }
+ } else {
+ const uint32_t* row_offsets = rows_const->offsets();
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ const uint8_t* src;
+ uint8_t* dst;
+ src = rows_const->data(2) + row_offsets[start_row + i] + offset_within_row;
+ dst = col_mutable_maybe_null->mutable_data(1) + col_width * i;
+ copy_fn(dst, src, col_width);
+ }
+ }
+ }
+
+ template <bool is_row_fixed_length>
+ static void DecodeImp(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+ static void DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
+ uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col);
+ template <bool is_row_fixed_length>
+ static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col);
+#endif
+};
+
+class EncoderBinaryPair {
+ public:
+ static bool CanProcessPair(const KeyColumnMetadata& col1,
+ const KeyColumnMetadata& col2) {
+ return EncoderBinary::IsInteger(col1) && EncoderBinary::IsInteger(col2);
+ }
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col1, KeyColumnArray* col2,
+ LightContext* ctx, KeyColumnArray* temp1, KeyColumnArray* temp2);
+
+ private:
+ template <bool is_row_fixed_length, typename col1_type, typename col2_type>
+ static void DecodeImp(uint32_t num_rows_to_skip, uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col1, KeyColumnArray* col2);
+#if defined(ARROW_HAVE_AVX2)
+ static uint32_t DecodeHelper_avx2(bool is_row_fixed_length, uint32_t col_width,
+ uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col1, KeyColumnArray* col2);
+ template <bool is_row_fixed_length, uint32_t col_width>
+ static uint32_t DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col1, KeyColumnArray* col2);
+#endif
+};
+
+class EncoderOffsets {
+ public:
+ static void GetRowOffsetsSelected(RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection);
+ static void EncodeSelected(RowTableImpl* rows, const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection);
+
+ static void Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* varbinary_cols,
+ const std::vector<uint32_t>& varbinary_cols_base_offset,
+ LightContext* ctx);
+
+ private:
+ template <bool has_nulls, bool is_first_varbinary>
+ static void EncodeSelectedImp(uint32_t ivarbinary, RowTableImpl* rows,
+ const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection);
+};
+
+class EncoderVarBinary {
+ public:
+ static void EncodeSelected(uint32_t ivarbinary, RowTableImpl* rows,
+ const KeyColumnArray& cols, uint32_t num_selected,
+ const uint16_t* selection);
+
+ static void Decode(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+ const RowTableImpl& rows, KeyColumnArray* col, LightContext* ctx);
+
+ private:
+ template <bool first_varbinary_col, class COPY_FN>
+ static inline void DecodeHelper(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id,
+ const RowTableImpl* rows_const,
+ RowTableImpl* rows_mutable_maybe_null,
+ const KeyColumnArray* col_const,
+ KeyColumnArray* col_mutable_maybe_null,
+ COPY_FN copy_fn) {
+ // Column and rows need to be varying length
+ ARROW_DCHECK(!rows_const->metadata().is_fixed_length &&
+ !col_const->metadata().is_fixed_length);
+
+ const uint32_t* row_offsets_for_batch = rows_const->offsets() + start_row;
+ const uint32_t* col_offsets = col_const->offsets();
+
+ uint32_t col_offset_next = col_offsets[0];
+ for (uint32_t i = 0; i < num_rows; ++i) {
+ uint32_t col_offset = col_offset_next;
+ col_offset_next = col_offsets[i + 1];
+
+ uint32_t row_offset = row_offsets_for_batch[i];
+ const uint8_t* row = rows_const->data(2) + row_offset;
+
+ uint32_t offset_within_row;
+ uint32_t length;
+ if (first_varbinary_col) {
+ rows_const->metadata().first_varbinary_offset_and_length(row, &offset_within_row,
+ &length);
+ } else {
+ rows_const->metadata().nth_varbinary_offset_and_length(
+ row, varbinary_col_id, &offset_within_row, &length);
+ }
+
+ row_offset += offset_within_row;
+
+ const uint8_t* src;
+ uint8_t* dst;
+ src = rows_const->data(2) + row_offset;
+ dst = col_mutable_maybe_null->mutable_data(2) + col_offset;
+ copy_fn(dst, src, length);
+ }
+ }
+ template <bool first_varbinary_col>
+ static void DecodeImp(uint32_t start_row, uint32_t num_rows, uint32_t varbinary_col_id,
+ const RowTableImpl& rows, KeyColumnArray* col);
+#if defined(ARROW_HAVE_AVX2)
+ static void DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const RowTableImpl& rows,
+ KeyColumnArray* col);
+ template <bool first_varbinary_col>
+ static void DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const RowTableImpl& rows,
+ KeyColumnArray* col);
+#endif
+};
+
+class EncoderNulls {
+ public:
+ static void EncodeSelected(RowTableImpl* rows, const std::vector<KeyColumnArray>& cols,
+ uint32_t num_selected, const uint16_t* selection);
+
+ static void Decode(uint32_t start_row, uint32_t num_rows, const RowTableImpl& rows,
+ std::vector<KeyColumnArray>* cols);
+};
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/exec/key_encode_avx2.cc b/cpp/src/arrow/compute/row/encode_internal_avx2.cc
similarity index 83%
rename from cpp/src/arrow/compute/exec/key_encode_avx2.cc
rename to cpp/src/arrow/compute/row/encode_internal_avx2.cc
index 832bb0361d..02ba310bde 100644
--- a/cpp/src/arrow/compute/exec/key_encode_avx2.cc
+++ b/cpp/src/arrow/compute/row/encode_internal_avx2.cc
@@ -17,18 +17,16 @@
#include <immintrin.h>
-#include "arrow/compute/exec/key_encode.h"
+#include "arrow/compute/row/encode_internal.h"
namespace arrow {
namespace compute {
#if defined(ARROW_HAVE_AVX2)
-void KeyEncoder::EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length,
- uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows,
- KeyColumnArray* col) {
+void EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length, uint32_t start_row,
+ uint32_t num_rows, uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col) {
if (is_row_fixed_length) {
DecodeImp_avx2<true>(start_row, num_rows, offset_within_row, rows, col);
} else {
@@ -37,10 +35,9 @@ void KeyEncoder::EncoderBinary::DecodeHelper_avx2(bool is_row_fixed_length,
}
template <bool is_row_fixed_length>
-void KeyEncoder::EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row,
- const KeyRowArray& rows,
- KeyColumnArray* col) {
+void EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row, const RowTableImpl& rows,
+ KeyColumnArray* col) {
DecodeHelper<is_row_fixed_length>(
start_row, num_rows, offset_within_row, &rows, nullptr, col, col,
[](uint8_t* dst, const uint8_t* src, int64_t length) {
@@ -52,13 +49,13 @@ void KeyEncoder::EncoderBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_
});
}
-uint32_t KeyEncoder::EncoderBinaryPair::DecodeHelper_avx2(
+uint32_t EncoderBinaryPair::DecodeHelper_avx2(
bool is_row_fixed_length, uint32_t col_width, uint32_t start_row, uint32_t num_rows,
- uint32_t offset_within_row, const KeyRowArray& rows, KeyColumnArray* col1,
+ uint32_t offset_within_row, const RowTableImpl& rows, KeyColumnArray* col1,
KeyColumnArray* col2) {
using DecodeImp_avx2_t =
uint32_t (*)(uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col1, KeyColumnArray* col2);
+ const RowTableImpl& rows, KeyColumnArray* col1, KeyColumnArray* col2);
static const DecodeImp_avx2_t DecodeImp_avx2_fn[] = {
DecodeImp_avx2<false, 1>, DecodeImp_avx2<false, 2>, DecodeImp_avx2<false, 4>,
DecodeImp_avx2<false, 8>, DecodeImp_avx2<true, 1>, DecodeImp_avx2<true, 2>,
@@ -70,9 +67,10 @@ uint32_t KeyEncoder::EncoderBinaryPair::DecodeHelper_avx2(
}
template <bool is_row_fixed_length, uint32_t col_width>
-uint32_t KeyEncoder::EncoderBinaryPair::DecodeImp_avx2(
- uint32_t start_row, uint32_t num_rows, uint32_t offset_within_row,
- const KeyRowArray& rows, KeyColumnArray* col1, KeyColumnArray* col2) {
+uint32_t EncoderBinaryPair::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t offset_within_row,
+ const RowTableImpl& rows, KeyColumnArray* col1,
+ KeyColumnArray* col2) {
ARROW_DCHECK(col_width == 1 || col_width == 2 || col_width == 4 || col_width == 8);
uint8_t* col_vals_A = col1->mutable_data(1);
@@ -207,11 +205,9 @@ uint32_t KeyEncoder::EncoderBinaryPair::DecodeImp_avx2(
return num_processed;
}
-void KeyEncoder::EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row,
- uint32_t num_rows,
- uint32_t varbinary_col_id,
- const KeyRowArray& rows,
- KeyColumnArray* col) {
+void EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id,
+ const RowTableImpl& rows, KeyColumnArray* col) {
if (varbinary_col_id == 0) {
DecodeImp_avx2<true>(start_row, num_rows, varbinary_col_id, rows, col);
} else {
@@ -220,10 +216,9 @@ void KeyEncoder::EncoderVarBinary::DecodeHelper_avx2(uint32_t start_row,
}
template <bool first_varbinary_col>
-void KeyEncoder::EncoderVarBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
- uint32_t varbinary_col_id,
- const KeyRowArray& rows,
- KeyColumnArray* col) {
+void EncoderVarBinary::DecodeImp_avx2(uint32_t start_row, uint32_t num_rows,
+ uint32_t varbinary_col_id, const RowTableImpl& rows,
+ KeyColumnArray* col) {
DecodeHelper<first_varbinary_col>(
start_row, num_rows, varbinary_col_id, &rows, nullptr, col, col,
[](uint8_t* dst, const uint8_t* src, int64_t length) {
diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc
new file mode 100644
index 0000000000..eb55124b17
--- /dev/null
+++ b/cpp/src/arrow/compute/row/grouper.cc
@@ -0,0 +1,590 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/row/grouper.h"
+
+#include <mutex>
+
+#include "arrow/compute/exec/key_hash.h"
+#include "arrow/compute/exec/key_map.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/exec_internal.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/kernels/row_encoder.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/compute/registry.h"
+#include "arrow/compute/row/compare_internal.h"
+#include "arrow/type.h"
+#include "arrow/util/bitmap_ops.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+#include "arrow/util/task_group.h"
+
+namespace arrow {
+
+using internal::checked_cast;
+
+namespace compute {
+
+namespace {
+
+struct GrouperImpl : Grouper {
+ static Result<std::unique_ptr<GrouperImpl>> Make(const std::vector<ValueDescr>& keys,
+ ExecContext* ctx) {
+ auto impl = ::arrow::internal::make_unique<GrouperImpl>();
+
+ impl->encoders_.resize(keys.size());
+ impl->ctx_ = ctx;
+
+ for (size_t i = 0; i < keys.size(); ++i) {
+ const auto& key = keys[i].type;
+
+ if (key->id() == Type::BOOL) {
+ impl->encoders_[i] =
+ ::arrow::internal::make_unique<internal::BooleanKeyEncoder>();
+ continue;
+ }
+
+ if (key->id() == Type::DICTIONARY) {
+ impl->encoders_[i] =
+ ::arrow::internal::make_unique<internal::DictionaryKeyEncoder>(
+ key, ctx->memory_pool());
+ continue;
+ }
+
+ if (is_fixed_width(key->id())) {
+ impl->encoders_[i] =
+ ::arrow::internal::make_unique<internal::FixedWidthKeyEncoder>(key);
+ continue;
+ }
+
+ if (is_binary_like(key->id())) {
+ impl->encoders_[i] =
+ ::arrow::internal::make_unique<internal::VarLengthKeyEncoder<BinaryType>>(
+ key);
+ continue;
+ }
+
+ if (is_large_binary_like(key->id())) {
+ impl->encoders_[i] = ::arrow::internal::make_unique<
+ internal::VarLengthKeyEncoder<LargeBinaryType>>(key);
+ continue;
+ }
+
+ if (key->id() == Type::NA) {
+ impl->encoders_[i] = ::arrow::internal::make_unique<internal::NullKeyEncoder>();
+ continue;
+ }
+
+ return Status::NotImplemented("Keys of type ", *key);
+ }
+
+ return std::move(impl);
+ }
+
+ Result<Datum> Consume(const ExecBatch& batch) override {
+ std::vector<int32_t> offsets_batch(batch.length + 1);
+ for (int i = 0; i < batch.num_values(); ++i) {
+ encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data());
+ }
+
+ int32_t total_length = 0;
+ for (int64_t i = 0; i < batch.length; ++i) {
+ auto total_length_before = total_length;
+ total_length += offsets_batch[i];
+ offsets_batch[i] = total_length_before;
+ }
+ offsets_batch[batch.length] = total_length;
+
+ std::vector<uint8_t> key_bytes_batch(total_length);
+ std::vector<uint8_t*> key_buf_ptrs(batch.length);
+ for (int64_t i = 0; i < batch.length; ++i) {
+ key_buf_ptrs[i] = key_bytes_batch.data() + offsets_batch[i];
+ }
+
+ for (int i = 0; i < batch.num_values(); ++i) {
+ RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data()));
+ }
+
+ TypedBufferBuilder<uint32_t> group_ids_batch(ctx_->memory_pool());
+ RETURN_NOT_OK(group_ids_batch.Resize(batch.length));
+
+ for (int64_t i = 0; i < batch.length; ++i) {
+ int32_t key_length = offsets_batch[i + 1] - offsets_batch[i];
+ std::string key(
+ reinterpret_cast<const char*>(key_bytes_batch.data() + offsets_batch[i]),
+ key_length);
+
+ auto it_success = map_.emplace(key, num_groups_);
+ auto group_id = it_success.first->second;
+
+ if (it_success.second) {
+ // new key; update offsets and key_bytes
+ ++num_groups_;
+ // Skip if there are no keys
+ if (key_length > 0) {
+ auto next_key_offset = static_cast<int32_t>(key_bytes_.size());
+ key_bytes_.resize(next_key_offset + key_length);
+ offsets_.push_back(next_key_offset + key_length);
+ memcpy(key_bytes_.data() + next_key_offset, key.c_str(), key_length);
+ }
+ }
+
+ group_ids_batch.UnsafeAppend(group_id);
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto group_ids, group_ids_batch.Finish());
+ return Datum(UInt32Array(batch.length, std::move(group_ids)));
+ }
+
+ uint32_t num_groups() const override { return num_groups_; }
+
+ Result<ExecBatch> GetUniques() override {
+ ExecBatch out({}, num_groups_);
+
+ std::vector<uint8_t*> key_buf_ptrs(num_groups_);
+ for (int64_t i = 0; i < num_groups_; ++i) {
+ key_buf_ptrs[i] = key_bytes_.data() + offsets_[i];
+ }
+
+ out.values.resize(encoders_.size());
+ for (size_t i = 0; i < encoders_.size(); ++i) {
+ ARROW_ASSIGN_OR_RAISE(
+ out.values[i],
+ encoders_[i]->Decode(key_buf_ptrs.data(), static_cast<int32_t>(num_groups_),
+ ctx_->memory_pool()));
+ }
+
+ return out;
+ }
+
+ ExecContext* ctx_;
+ std::unordered_map<std::string, uint32_t> map_;
+ std::vector<int32_t> offsets_ = {0};
+ std::vector<uint8_t> key_bytes_;
+ uint32_t num_groups_ = 0;
+ std::vector<std::unique_ptr<internal::KeyEncoder>> encoders_;
+};
+
+struct GrouperFastImpl : Grouper {
+ static constexpr int kBitmapPaddingForSIMD = 64; // bits
+ static constexpr int kPaddingForSIMD = 32; // bytes
+
+ static bool CanUse(const std::vector<ValueDescr>& keys) {
+#if ARROW_LITTLE_ENDIAN
+ for (size_t i = 0; i < keys.size(); ++i) {
+ const auto& key = keys[i].type;
+ if (is_large_binary_like(key->id())) {
+ return false;
+ }
+ }
+ return true;
+#else
+ return false;
+#endif
+ }
+
+ static Result<std::unique_ptr<GrouperFastImpl>> Make(
+ const std::vector<ValueDescr>& keys, ExecContext* ctx) {
+ auto impl = ::arrow::internal::make_unique<GrouperFastImpl>();
+ impl->ctx_ = ctx;
+
+ RETURN_NOT_OK(impl->temp_stack_.Init(ctx->memory_pool(), 64 * minibatch_size_max_));
+ impl->encode_ctx_.hardware_flags =
+ arrow::internal::CpuInfo::GetInstance()->hardware_flags();
+ impl->encode_ctx_.stack = &impl->temp_stack_;
+
+ auto num_columns = keys.size();
+ impl->col_metadata_.resize(num_columns);
+ impl->key_types_.resize(num_columns);
+ impl->dictionaries_.resize(num_columns);
+ for (size_t icol = 0; icol < num_columns; ++icol) {
+ const auto& key = keys[icol].type;
+ if (key->id() == Type::DICTIONARY) {
+ auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
+ ARROW_DCHECK(bit_width % 8 == 0);
+ impl->col_metadata_[icol] = KeyColumnMetadata(true, bit_width / 8);
+ } else if (key->id() == Type::BOOL) {
+ impl->col_metadata_[icol] = KeyColumnMetadata(true, 0);
+ } else if (is_fixed_width(key->id())) {
+ impl->col_metadata_[icol] = KeyColumnMetadata(
+ true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
+ } else if (is_binary_like(key->id())) {
+ impl->col_metadata_[icol] = KeyColumnMetadata(false, sizeof(uint32_t));
+ } else if (key->id() == Type::NA) {
+ impl->col_metadata_[icol] = KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
+ } else {
+ return Status::NotImplemented("Keys of type ", *key);
+ }
+ impl->key_types_[icol] = key;
+ }
+
+ impl->encoder_.Init(impl->col_metadata_, &impl->encode_ctx_,
+ /* row_alignment = */ sizeof(uint64_t),
+ /* string_alignment = */ sizeof(uint64_t));
+ RETURN_NOT_OK(impl->rows_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
+ RETURN_NOT_OK(
+ impl->rows_minibatch_.Init(ctx->memory_pool(), impl->encoder_.row_metadata()));
+ impl->minibatch_size_ = impl->minibatch_size_min_;
+ GrouperFastImpl* impl_ptr = impl.get();
+ auto equal_func = [impl_ptr](
+ int num_keys_to_compare, const uint16_t* selection_may_be_null,
+ const uint32_t* group_ids, uint32_t* out_num_keys_mismatch,
+ uint16_t* out_selection_mismatch) {
+ KeyCompare::CompareColumnsToRows(
+ num_keys_to_compare, selection_may_be_null, group_ids, &impl_ptr->encode_ctx_,
+ out_num_keys_mismatch, out_selection_mismatch,
+ impl_ptr->encoder_.batch_all_cols(), impl_ptr->rows_);
+ };
+ auto append_func = [impl_ptr](int num_keys, const uint16_t* selection) {
+ RETURN_NOT_OK(impl_ptr->encoder_.EncodeSelected(&impl_ptr->rows_minibatch_,
+ num_keys, selection));
+ return impl_ptr->rows_.AppendSelectionFrom(impl_ptr->rows_minibatch_, num_keys,
+ nullptr);
+ };
+ RETURN_NOT_OK(impl->map_.init(impl->encode_ctx_.hardware_flags, ctx->memory_pool(),
+ impl->encode_ctx_.stack, impl->log_minibatch_max_,
+ equal_func, append_func));
+ impl->cols_.resize(num_columns);
+ impl->minibatch_hashes_.resize(impl->minibatch_size_max_ +
+ kPaddingForSIMD / sizeof(uint32_t));
+
+ return std::move(impl);
+ }
+
+ ~GrouperFastImpl() { map_.cleanup(); }
+
+ Result<Datum> Consume(const ExecBatch& batch) override {
+ // ARROW-14027: broadcast scalar arguments for now
+ for (int i = 0; i < batch.num_values(); i++) {
+ if (batch.values[i].is_scalar()) {
+ ExecBatch expanded = batch;
+ for (int j = i; j < expanded.num_values(); j++) {
+ if (expanded.values[j].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ expanded.values[j],
+ MakeArrayFromScalar(*expanded.values[j].scalar(), expanded.length,
+ ctx_->memory_pool()));
+ }
+ }
+ return ConsumeImpl(expanded);
+ }
+ }
+ return ConsumeImpl(batch);
+ }
+
+ Result<Datum> ConsumeImpl(const ExecBatch& batch) {
+ int64_t num_rows = batch.length;
+ int num_columns = batch.num_values();
+ // Process dictionaries
+ for (int icol = 0; icol < num_columns; ++icol) {
+ if (key_types_[icol]->id() == Type::DICTIONARY) {
+ auto data = batch[icol].array();
+ auto dict = MakeArray(data->dictionary);
+ if (dictionaries_[icol]) {
+ if (!dictionaries_[icol]->Equals(dict)) {
+ // TODO(bkietz) unify if necessary. For now, just error if any batch's
+ // dictionary differs from the first we saw for this key
+ return Status::NotImplemented("Unifying differing dictionaries");
+ }
+ } else {
+ dictionaries_[icol] = std::move(dict);
+ }
+ }
+ }
+
+ std::shared_ptr<arrow::Buffer> group_ids;
+ ARROW_ASSIGN_OR_RAISE(
+ group_ids, AllocateBuffer(sizeof(uint32_t) * num_rows, ctx_->memory_pool()));
+
+ for (int icol = 0; icol < num_columns; ++icol) {
+ const uint8_t* non_nulls = NULLPTR;
+ const uint8_t* fixedlen = NULLPTR;
+ const uint8_t* varlen = NULLPTR;
+
+ // Skip if the key's type is NULL
+ if (key_types_[icol]->id() != Type::NA) {
+ if (batch[icol].array()->buffers[0] != NULLPTR) {
+ non_nulls = batch[icol].array()->buffers[0]->data();
+ }
+ fixedlen = batch[icol].array()->buffers[1]->data();
+ if (!col_metadata_[icol].is_fixed_length) {
+ varlen = batch[icol].array()->buffers[2]->data();
+ }
+ }
+
+ int64_t offset = batch[icol].array()->offset;
+
+ auto col_base = KeyColumnArray(col_metadata_[icol], offset + num_rows, non_nulls,
+ fixedlen, varlen);
+
+ cols_[icol] = col_base.Slice(offset, num_rows);
+ }
+
+ // Split into smaller mini-batches
+ //
+ for (uint32_t start_row = 0; start_row < num_rows;) {
+ uint32_t batch_size_next = std::min(static_cast<uint32_t>(minibatch_size_),
+ static_cast<uint32_t>(num_rows) - start_row);
+
+ // Encode
+ rows_minibatch_.Clean();
+ encoder_.PrepareEncodeSelected(start_row, batch_size_next, cols_);
+
+ // Compute hash
+ Hashing32::HashMultiColumn(encoder_.batch_all_cols(), &encode_ctx_,
+ minibatch_hashes_.data());
+
+ // Map
+ auto match_bitvector =
+ util::TempVectorHolder<uint8_t>(&temp_stack_, (batch_size_next + 7) / 8);
+ {
+ auto local_slots = util::TempVectorHolder<uint8_t>(&temp_stack_, batch_size_next);
+ map_.early_filter(batch_size_next, minibatch_hashes_.data(),
+ match_bitvector.mutable_data(), local_slots.mutable_data());
+ map_.find(batch_size_next, minibatch_hashes_.data(),
+ match_bitvector.mutable_data(), local_slots.mutable_data(),
+ reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row);
+ }
+ auto ids = util::TempVectorHolder<uint16_t>(&temp_stack_, batch_size_next);
+ int num_ids;
+ util::bit_util::bits_to_indexes(0, encode_ctx_.hardware_flags, batch_size_next,
+ match_bitvector.mutable_data(), &num_ids,
+ ids.mutable_data());
+
+ RETURN_NOT_OK(map_.map_new_keys(
+ num_ids, ids.mutable_data(), minibatch_hashes_.data(),
+ reinterpret_cast<uint32_t*>(group_ids->mutable_data()) + start_row));
+
+ start_row += batch_size_next;
+
+ if (minibatch_size_ * 2 <= minibatch_size_max_) {
+ minibatch_size_ *= 2;
+ }
+ }
+
+ return Datum(UInt32Array(batch.length, std::move(group_ids)));
+ }
+
+ uint32_t num_groups() const override { return static_cast<uint32_t>(rows_.length()); }
+
+ // Make sure padded buffers end up with the right logical size
+
+ Result<std::shared_ptr<Buffer>> AllocatePaddedBitmap(int64_t length) {
+ ARROW_ASSIGN_OR_RAISE(
+ std::shared_ptr<Buffer> buf,
+ AllocateBitmap(length + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+ return SliceMutableBuffer(buf, 0, bit_util::BytesForBits(length));
+ }
+
+ Result<std::shared_ptr<Buffer>> AllocatePaddedBuffer(int64_t size) {
+ ARROW_ASSIGN_OR_RAISE(
+ std::shared_ptr<Buffer> buf,
+ AllocateBuffer(size + kBitmapPaddingForSIMD, ctx_->memory_pool()));
+ return SliceMutableBuffer(buf, 0, size);
+ }
+
+ Result<ExecBatch> GetUniques() override {
+ auto num_columns = static_cast<uint32_t>(col_metadata_.size());
+ int64_t num_groups = rows_.length();
+
+ std::vector<std::shared_ptr<Buffer>> non_null_bufs(num_columns);
+ std::vector<std::shared_ptr<Buffer>> fixedlen_bufs(num_columns);
+ std::vector<std::shared_ptr<Buffer>> varlen_bufs(num_columns);
+
+ for (size_t i = 0; i < num_columns; ++i) {
+ if (col_metadata_[i].is_null_type) {
+ uint8_t* non_nulls = NULLPTR;
+ uint8_t* fixedlen = NULLPTR;
+ cols_[i] =
+ KeyColumnArray(col_metadata_[i], num_groups, non_nulls, fixedlen, NULLPTR);
+ continue;
+ }
+ ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
+ if (col_metadata_[i].is_fixed_length && !col_metadata_[i].is_null_type) {
+ if (col_metadata_[i].fixed_length == 0) {
+ ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i], AllocatePaddedBitmap(num_groups));
+ } else {
+ ARROW_ASSIGN_OR_RAISE(
+ fixedlen_bufs[i],
+ AllocatePaddedBuffer(num_groups * col_metadata_[i].fixed_length));
+ }
+ } else {
+ ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
+ AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
+ }
+ cols_[i] =
+ KeyColumnArray(col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
+ fixedlen_bufs[i]->mutable_data(), nullptr);
+ }
+
+ for (int64_t start_row = 0; start_row < num_groups;) {
+ int64_t batch_size_next =
+ std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
+ encoder_.DecodeFixedLengthBuffers(start_row, start_row, batch_size_next, rows_,
+ &cols_);
+ start_row += batch_size_next;
+ }
+
+ if (!rows_.metadata().is_fixed_length) {
+ for (size_t i = 0; i < num_columns; ++i) {
+ if (!col_metadata_[i].is_fixed_length) {
+ auto varlen_size =
+ reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
+ ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
+ cols_[i] = KeyColumnArray(
+ col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
+ fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
+ }
+ }
+
+ for (int64_t start_row = 0; start_row < num_groups;) {
+ int64_t batch_size_next =
+ std::min(num_groups - start_row, static_cast<int64_t>(minibatch_size_max_));
+ encoder_.DecodeVaryingLengthBuffers(start_row, start_row, batch_size_next, rows_,
+ &cols_);
+ start_row += batch_size_next;
+ }
+ }
+
+ ExecBatch out({}, num_groups);
+ out.values.resize(num_columns);
+ for (size_t i = 0; i < num_columns; ++i) {
+ if (col_metadata_[i].is_null_type) {
+ out.values[i] = ArrayData::Make(null(), num_groups, {nullptr}, num_groups);
+ continue;
+ }
+ auto valid_count = arrow::internal::CountSetBits(
+ non_null_bufs[i]->data(), /*offset=*/0, static_cast<int64_t>(num_groups));
+ int null_count = static_cast<int>(num_groups) - static_cast<int>(valid_count);
+
+ if (col_metadata_[i].is_fixed_length) {
+ out.values[i] = ArrayData::Make(
+ key_types_[i], num_groups,
+ {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i])}, null_count);
+ } else {
+ out.values[i] =
+ ArrayData::Make(key_types_[i], num_groups,
+ {std::move(non_null_bufs[i]), std::move(fixedlen_bufs[i]),
+ std::move(varlen_bufs[i])},
+ null_count);
+ }
+ }
+
+ // Process dictionaries
+ for (size_t icol = 0; icol < num_columns; ++icol) {
+ if (key_types_[icol]->id() == Type::DICTIONARY) {
+ if (dictionaries_[icol]) {
+ out.values[icol].array()->dictionary = dictionaries_[icol]->data();
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto dict, MakeArrayOfNull(key_types_[icol], 0));
+ out.values[icol].array()->dictionary = dict->data();
+ }
+ }
+ }
+
+ return out;
+ }
+
+ static constexpr int log_minibatch_max_ = 10;
+ static constexpr int minibatch_size_max_ = 1 << log_minibatch_max_;
+ static constexpr int minibatch_size_min_ = 128;
+ int minibatch_size_;
+
+ ExecContext* ctx_;
+ arrow::util::TempVectorStack temp_stack_;
+ LightContext encode_ctx_;
+
+ std::vector<std::shared_ptr<arrow::DataType>> key_types_;
+ std::vector<KeyColumnMetadata> col_metadata_;
+ std::vector<KeyColumnArray> cols_;
+ std::vector<uint32_t> minibatch_hashes_;
+
+ std::vector<std::shared_ptr<Array>> dictionaries_;
+
+ RowTableImpl rows_;
+ RowTableImpl rows_minibatch_;
+ RowTableEncoder encoder_;
+ SwissTable map_;
+};
+
+} // namespace
+
+Result<std::unique_ptr<Grouper>> Grouper::Make(const std::vector<ValueDescr>& descrs,
+ ExecContext* ctx) {
+ if (GrouperFastImpl::CanUse(descrs)) {
+ return GrouperFastImpl::Make(descrs, ctx);
+ }
+ return GrouperImpl::Make(descrs, ctx);
+}
+
+Result<std::shared_ptr<ListArray>> Grouper::ApplyGroupings(const ListArray& groupings,
+ const Array& array,
+ ExecContext* ctx) {
+ ARROW_ASSIGN_OR_RAISE(Datum sorted,
+ compute::Take(array, groupings.data()->child_data[0],
+ TakeOptions::NoBoundsCheck(), ctx));
+
+ return std::make_shared<ListArray>(list(array.type()), groupings.length(),
+ groupings.value_offsets(), sorted.make_array());
+}
+
+Result<std::shared_ptr<ListArray>> Grouper::MakeGroupings(const UInt32Array& ids,
+ uint32_t num_groups,
+ ExecContext* ctx) {
+ if (ids.null_count() != 0) {
+ return Status::Invalid("MakeGroupings with null ids");
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto offsets, AllocateBuffer(sizeof(int32_t) * (num_groups + 1),
+ ctx->memory_pool()));
+ auto raw_offsets = reinterpret_cast<int32_t*>(offsets->mutable_data());
+
+ std::memset(raw_offsets, 0, offsets->size());
+ for (int i = 0; i < ids.length(); ++i) {
+ DCHECK_LT(ids.Value(i), num_groups);
+ raw_offsets[ids.Value(i)] += 1;
+ }
+ int32_t length = 0;
+ for (uint32_t id = 0; id < num_groups; ++id) {
+ auto offset = raw_offsets[id];
+ raw_offsets[id] = length;
+ length += offset;
+ }
+ raw_offsets[num_groups] = length;
+ DCHECK_EQ(ids.length(), length);
+
+ ARROW_ASSIGN_OR_RAISE(auto offsets_copy,
+ offsets->CopySlice(0, offsets->size(), ctx->memory_pool()));
+ raw_offsets = reinterpret_cast<int32_t*>(offsets_copy->mutable_data());
+
+ ARROW_ASSIGN_OR_RAISE(auto sort_indices, AllocateBuffer(sizeof(int32_t) * ids.length(),
+ ctx->memory_pool()));
+ auto raw_sort_indices = reinterpret_cast<int32_t*>(sort_indices->mutable_data());
+ for (int i = 0; i < ids.length(); ++i) {
+ raw_sort_indices[raw_offsets[ids.Value(i)]++] = i;
+ }
+
+ return std::make_shared<ListArray>(
+ list(int32()), num_groups, std::move(offsets),
+ std::make_shared<Int32Array>(ids.length(), std::move(sort_indices)));
+}
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h
new file mode 100644
index 0000000000..8281b75317
--- /dev/null
+++ b/cpp/src/arrow/compute/row/grouper.h
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "arrow/compute/exec.h"
+#include "arrow/compute/exec/options.h"
+#include "arrow/compute/kernel.h"
+#include "arrow/datum.h"
+#include "arrow/result.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+namespace compute {
+
+/// Consumes batches of keys and yields batches of the group ids.
+class ARROW_EXPORT Grouper {
+ public:
+ virtual ~Grouper() = default;
+
+ /// Construct a Grouper which receives the specified key types
+ static Result<std::unique_ptr<Grouper>> Make(const std::vector<ValueDescr>& descrs,
+ ExecContext* ctx = default_exec_context());
+
+ /// Consume a batch of keys, producing the corresponding group ids as an integer array.
+ /// Currently only uint32 indices will be produced, eventually the bit width will only
+ /// be as wide as necessary.
+ virtual Result<Datum> Consume(const ExecBatch& batch) = 0;
+
+ /// Get current unique keys. May be called multiple times.
+ virtual Result<ExecBatch> GetUniques() = 0;
+
+ /// Get the current number of groups.
+ virtual uint32_t num_groups() const = 0;
+
+ /// \brief Assemble lists of indices of identical elements.
+ ///
+ /// \param[in] ids An unsigned, all-valid integral array which will be
+ /// used as grouping criteria.
+ /// \param[in] num_groups An upper bound for the elements of ids
+ /// \param[in] ctx Execution context to use during the operation
+ /// \return A num_groups-long ListArray where the slot at i contains a
+ /// list of indices where i appears in ids.
+ ///
+ /// MakeGroupings([
+ /// 2,
+ /// 2,
+ /// 5,
+ /// 5,
+ /// 2,
+ /// 3
+ /// ], 8) == [
+ /// [],
+ /// [],
+ /// [0, 1, 4],
+ /// [5],
+ /// [],
+ /// [2, 3],
+ /// [],
+ /// []
+ /// ]
+ static Result<std::shared_ptr<ListArray>> MakeGroupings(
+ const UInt32Array& ids, uint32_t num_groups,
+ ExecContext* ctx = default_exec_context());
+
+ /// \brief Produce a ListArray whose slots are selections of `array` which correspond to
+ /// the provided groupings.
+ ///
+ /// For example,
+ /// ApplyGroupings([
+ /// [],
+ /// [],
+ /// [0, 1, 4],
+ /// [5],
+ /// [],
+ /// [2, 3],
+ /// [],
+ /// []
+ /// ], [2, 2, 5, 5, 2, 3]) == [
+ /// [],
+ /// [],
+ /// [2, 2, 2],
+ /// [3],
+ /// [],
+ /// [5, 5],
+ /// [],
+ /// []
+ /// ]
+ static Result<std::shared_ptr<ListArray>> ApplyGroupings(
+ const ListArray& groupings, const Array& array,
+ ExecContext* ctx = default_exec_context());
+};
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/row/row_internal.cc b/cpp/src/arrow/compute/row/row_internal.cc
new file mode 100644
index 0000000000..e99ff75d64
--- /dev/null
+++ b/cpp/src/arrow/compute/row/row_internal.cc
@@ -0,0 +1,409 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/row/row_internal.h"
+
+#include "arrow/compute/exec/util.h"
+
+namespace arrow {
+namespace compute {
+
+uint32_t RowTableMetadata::num_varbinary_cols() const {
+ uint32_t result = 0;
+ for (auto column_metadata : column_metadatas) {
+ if (!column_metadata.is_fixed_length) {
+ ++result;
+ }
+ }
+ return result;
+}
+
+bool RowTableMetadata::is_compatible(const RowTableMetadata& other) const {
+ if (other.num_cols() != num_cols()) {
+ return false;
+ }
+ if (row_alignment != other.row_alignment ||
+ string_alignment != other.string_alignment) {
+ return false;
+ }
+ for (size_t i = 0; i < column_metadatas.size(); ++i) {
+ if (column_metadatas[i].is_fixed_length !=
+ other.column_metadatas[i].is_fixed_length) {
+ return false;
+ }
+ if (column_metadatas[i].fixed_length != other.column_metadatas[i].fixed_length) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void RowTableMetadata::FromColumnMetadataVector(
+ const std::vector<KeyColumnMetadata>& cols, int in_row_alignment,
+ int in_string_alignment) {
+ column_metadatas.resize(cols.size());
+ for (size_t i = 0; i < cols.size(); ++i) {
+ column_metadatas[i] = cols[i];
+ }
+
+ const auto num_cols = static_cast<uint32_t>(cols.size());
+
+ // Sort columns.
+ //
+ // Columns are sorted based on the size in bytes of their fixed-length part.
+ // For the varying-length column, the fixed-length part is the 32-bit field storing
+ // cumulative length of varying-length fields.
+ //
+ // The rules are:
+ //
+ // a) Boolean column, marked with fixed-length 0, is considered to have fixed-length
+ // part of 1 byte.
+ //
+ // b) Columns with fixed-length part being power of 2 or multiple of row
+ // alignment precede other columns. They are sorted in decreasing order of the size of
+ // their fixed-length part.
+ //
+ // c) Fixed-length columns precede varying-length columns when
+ // both have the same size fixed-length part.
+ //
+ column_order.resize(num_cols);
+ for (uint32_t i = 0; i < num_cols; ++i) {
+ column_order[i] = i;
+ }
+ std::sort(
+ column_order.begin(), column_order.end(), [&cols](uint32_t left, uint32_t right) {
+ bool is_left_pow2 =
+ !cols[left].is_fixed_length || ARROW_POPCOUNT64(cols[left].fixed_length) <= 1;
+ bool is_right_pow2 = !cols[right].is_fixed_length ||
+ ARROW_POPCOUNT64(cols[right].fixed_length) <= 1;
+ bool is_left_fixedlen = cols[left].is_fixed_length;
+ bool is_right_fixedlen = cols[right].is_fixed_length;
+ uint32_t width_left =
+ cols[left].is_fixed_length ? cols[left].fixed_length : sizeof(uint32_t);
+ uint32_t width_right =
+ cols[right].is_fixed_length ? cols[right].fixed_length : sizeof(uint32_t);
+ if (is_left_pow2 != is_right_pow2) {
+ return is_left_pow2;
+ }
+ if (!is_left_pow2) {
+ return left < right;
+ }
+ if (width_left != width_right) {
+ return width_left > width_right;
+ }
+ if (is_left_fixedlen != is_right_fixedlen) {
+ return is_left_fixedlen;
+ }
+ return left < right;
+ });
+
+ row_alignment = in_row_alignment;
+ string_alignment = in_string_alignment;
+ varbinary_end_array_offset = 0;
+
+ column_offsets.resize(num_cols);
+ uint32_t num_varbinary_cols = 0;
+ uint32_t offset_within_row = 0;
+ for (uint32_t i = 0; i < num_cols; ++i) {
+ const KeyColumnMetadata& col = cols[column_order[i]];
+ if (col.is_fixed_length && col.fixed_length != 0 &&
+ ARROW_POPCOUNT64(col.fixed_length) != 1) {
+ offset_within_row += RowTableMetadata::padding_for_alignment(offset_within_row,
+ string_alignment, col);
+ }
+ column_offsets[i] = offset_within_row;
+ if (!col.is_fixed_length) {
+ if (num_varbinary_cols == 0) {
+ varbinary_end_array_offset = offset_within_row;
+ }
+ DCHECK(column_offsets[i] - varbinary_end_array_offset ==
+ num_varbinary_cols * sizeof(uint32_t));
+ ++num_varbinary_cols;
+ offset_within_row += sizeof(uint32_t);
+ } else {
+ // Boolean column is a bit-vector, which is indicated by
+ // setting fixed length in column metadata to zero.
+ // It will be stored as a byte in output row.
+ if (col.fixed_length == 0) {
+ offset_within_row += 1;
+ } else {
+ offset_within_row += col.fixed_length;
+ }
+ }
+ }
+
+ is_fixed_length = (num_varbinary_cols == 0);
+ fixed_length =
+ offset_within_row +
+ RowTableMetadata::padding_for_alignment(
+ offset_within_row, num_varbinary_cols == 0 ? row_alignment : string_alignment);
+
+ // We set the number of bytes per row storing null masks of individual key columns
+ // to be a power of two. This is not required. It could be also set to the minimal
+ // number of bytes required for a given number of bits (one bit per column).
+ null_masks_bytes_per_row = 1;
+ while (static_cast<uint32_t>(null_masks_bytes_per_row * 8) < num_cols) {
+ null_masks_bytes_per_row *= 2;
+ }
+}
+
+RowTableImpl::RowTableImpl() : pool_(nullptr), rows_capacity_(0), bytes_capacity_(0) {}
+
+Status RowTableImpl::Init(MemoryPool* pool, const RowTableMetadata& metadata) {
+ pool_ = pool;
+ metadata_ = metadata;
+
+ DCHECK(!null_masks_ && !offsets_ && !rows_);
+
+ constexpr int64_t kInitialRowsCapacity = 8;
+ constexpr int64_t kInitialBytesCapacity = 1024;
+
+ // Null masks
+ ARROW_ASSIGN_OR_RAISE(
+ auto null_masks,
+ AllocateResizableBuffer(size_null_masks(kInitialRowsCapacity), pool_));
+ null_masks_ = std::move(null_masks);
+ memset(null_masks_->mutable_data(), 0, size_null_masks(kInitialRowsCapacity));
+
+ // Offsets and rows
+ if (!metadata.is_fixed_length) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto offsets, AllocateResizableBuffer(size_offsets(kInitialRowsCapacity), pool_));
+ offsets_ = std::move(offsets);
+ memset(offsets_->mutable_data(), 0, size_offsets(kInitialRowsCapacity));
+ reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto rows,
+ AllocateResizableBuffer(size_rows_varying_length(kInitialBytesCapacity), pool_));
+ rows_ = std::move(rows);
+ memset(rows_->mutable_data(), 0, size_rows_varying_length(kInitialBytesCapacity));
+ bytes_capacity_ =
+ size_rows_varying_length(kInitialBytesCapacity) - kPaddingForVectors;
+ } else {
+ ARROW_ASSIGN_OR_RAISE(
+ auto rows,
+ AllocateResizableBuffer(size_rows_fixed_length(kInitialRowsCapacity), pool_));
+ rows_ = std::move(rows);
+ memset(rows_->mutable_data(), 0, size_rows_fixed_length(kInitialRowsCapacity));
+ bytes_capacity_ = size_rows_fixed_length(kInitialRowsCapacity) - kPaddingForVectors;
+ }
+
+ UpdateBufferPointers();
+
+ rows_capacity_ = kInitialRowsCapacity;
+
+ num_rows_ = 0;
+ num_rows_for_has_any_nulls_ = 0;
+ has_any_nulls_ = false;
+
+ return Status::OK();
+}
+
+void RowTableImpl::Clean() {
+ num_rows_ = 0;
+ num_rows_for_has_any_nulls_ = 0;
+ has_any_nulls_ = false;
+
+ if (!metadata_.is_fixed_length) {
+ reinterpret_cast<uint32_t*>(offsets_->mutable_data())[0] = 0;
+ }
+}
+
+int64_t RowTableImpl::size_null_masks(int64_t num_rows) const {
+ return num_rows * metadata_.null_masks_bytes_per_row + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_offsets(int64_t num_rows) const {
+ return (num_rows + 1) * sizeof(uint32_t) + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_rows_fixed_length(int64_t num_rows) const {
+ return num_rows * metadata_.fixed_length + kPaddingForVectors;
+}
+
+int64_t RowTableImpl::size_rows_varying_length(int64_t num_bytes) const {
+ return num_bytes + kPaddingForVectors;
+}
+
+void RowTableImpl::UpdateBufferPointers() {
+ buffers_[0] = null_masks_->mutable_data();
+ if (metadata_.is_fixed_length) {
+ buffers_[1] = rows_->mutable_data();
+ buffers_[2] = nullptr;
+ } else {
+ buffers_[1] = offsets_->mutable_data();
+ buffers_[2] = rows_->mutable_data();
+ }
+}
+
+Status RowTableImpl::ResizeFixedLengthBuffers(int64_t num_extra_rows) {
+ if (rows_capacity_ >= num_rows_ + num_extra_rows) {
+ return Status::OK();
+ }
+
+ int64_t rows_capacity_new = std::max(static_cast<int64_t>(1), 2 * rows_capacity_);
+ while (rows_capacity_new < num_rows_ + num_extra_rows) {
+ rows_capacity_new *= 2;
+ }
+
+ // Null masks
+ RETURN_NOT_OK(null_masks_->Resize(size_null_masks(rows_capacity_new), false));
+ memset(null_masks_->mutable_data() + size_null_masks(rows_capacity_), 0,
+ size_null_masks(rows_capacity_new) - size_null_masks(rows_capacity_));
+
+ // Either offsets or rows
+ if (!metadata_.is_fixed_length) {
+ RETURN_NOT_OK(offsets_->Resize(size_offsets(rows_capacity_new), false));
+ memset(offsets_->mutable_data() + size_offsets(rows_capacity_), 0,
+ size_offsets(rows_capacity_new) - size_offsets(rows_capacity_));
+ } else {
+ RETURN_NOT_OK(rows_->Resize(size_rows_fixed_length(rows_capacity_new), false));
+ memset(rows_->mutable_data() + size_rows_fixed_length(rows_capacity_), 0,
+ size_rows_fixed_length(rows_capacity_new) -
+ size_rows_fixed_length(rows_capacity_));
+ bytes_capacity_ = size_rows_fixed_length(rows_capacity_new) - kPaddingForVectors;
+ }
+
+ UpdateBufferPointers();
+
+ rows_capacity_ = rows_capacity_new;
+
+ return Status::OK();
+}
+
+Status RowTableImpl::ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes) {
+ int64_t num_bytes = offsets()[num_rows_];
+ if (bytes_capacity_ >= num_bytes + num_extra_bytes || metadata_.is_fixed_length) {
+ return Status::OK();
+ }
+
+ int64_t bytes_capacity_new = std::max(static_cast<int64_t>(1), 2 * bytes_capacity_);
+ while (bytes_capacity_new < num_bytes + num_extra_bytes) {
+ bytes_capacity_new *= 2;
+ }
+
+ RETURN_NOT_OK(rows_->Resize(size_rows_varying_length(bytes_capacity_new), false));
+ memset(rows_->mutable_data() + size_rows_varying_length(bytes_capacity_), 0,
+ size_rows_varying_length(bytes_capacity_new) -
+ size_rows_varying_length(bytes_capacity_));
+
+ UpdateBufferPointers();
+
+ bytes_capacity_ = bytes_capacity_new;
+
+ return Status::OK();
+}
+
+Status RowTableImpl::AppendSelectionFrom(const RowTableImpl& from,
+ uint32_t num_rows_to_append,
+ const uint16_t* source_row_ids) {
+ DCHECK(metadata_.is_compatible(from.metadata()));
+
+ RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
+
+ if (!metadata_.is_fixed_length) {
+ // Varying-length rows
+ auto from_offsets = reinterpret_cast<const uint32_t*>(from.offsets_->data());
+ auto to_offsets = reinterpret_cast<uint32_t*>(offsets_->mutable_data());
+ uint32_t total_length = to_offsets[num_rows_];
+ uint32_t total_length_to_append = 0;
+ for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+ uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+ uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
+ total_length_to_append += length;
+ to_offsets[num_rows_ + i + 1] = total_length + total_length_to_append;
+ }
+
+ RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(total_length_to_append));
+
+ const uint8_t* src = from.rows_->data();
+ uint8_t* dst = rows_->mutable_data() + total_length;
+ for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+ uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+ uint32_t length = from_offsets[row_id + 1] - from_offsets[row_id];
+ auto src64 = reinterpret_cast<const uint64_t*>(src + from_offsets[row_id]);
+ auto dst64 = reinterpret_cast<uint64_t*>(dst);
+ for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
+ dst64[j] = src64[j];
+ }
+ dst += length;
+ }
+ } else {
+ // Fixed-length rows
+ const uint8_t* src = from.rows_->data();
+ uint8_t* dst = rows_->mutable_data() + num_rows_ * metadata_.fixed_length;
+ for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+ uint16_t row_id = source_row_ids ? source_row_ids[i] : i;
+ uint32_t length = metadata_.fixed_length;
+ auto src64 = reinterpret_cast<const uint64_t*>(src + length * row_id);
+ auto dst64 = reinterpret_cast<uint64_t*>(dst);
+ for (uint32_t j = 0; j < bit_util::CeilDiv(length, 8); ++j) {
+ dst64[j] = src64[j];
+ }
+ dst += length;
+ }
+ }
+
+ // Null masks
+ uint32_t byte_length = metadata_.null_masks_bytes_per_row;
+ uint64_t dst_byte_offset = num_rows_ * byte_length;
+ const uint8_t* src_base = from.null_masks_->data();
+ uint8_t* dst_base = null_masks_->mutable_data();
+ for (uint32_t i = 0; i < num_rows_to_append; ++i) {
+ uint32_t row_id = source_row_ids ? source_row_ids[i] : i;
+ int64_t src_byte_offset = row_id * byte_length;
+ const uint8_t* src = src_base + src_byte_offset;
+ uint8_t* dst = dst_base + dst_byte_offset;
+ for (uint32_t ibyte = 0; ibyte < byte_length; ++ibyte) {
+ dst[ibyte] = src[ibyte];
+ }
+ dst_byte_offset += byte_length;
+ }
+
+ num_rows_ += num_rows_to_append;
+
+ return Status::OK();
+}
+
+Status RowTableImpl::AppendEmpty(uint32_t num_rows_to_append,
+ uint32_t num_extra_bytes_to_append) {
+ RETURN_NOT_OK(ResizeFixedLengthBuffers(num_rows_to_append));
+ RETURN_NOT_OK(ResizeOptionalVaryingLengthBuffer(num_extra_bytes_to_append));
+ num_rows_ += num_rows_to_append;
+ if (metadata_.row_alignment > 1 || metadata_.string_alignment > 1) {
+ memset(rows_->mutable_data(), 0, bytes_capacity_);
+ }
+ return Status::OK();
+}
+
+bool RowTableImpl::has_any_nulls(const LightContext* ctx) const {
+ if (has_any_nulls_) {
+ return true;
+ }
+ if (num_rows_for_has_any_nulls_ < num_rows_) {
+ auto size_per_row = metadata().null_masks_bytes_per_row;
+ has_any_nulls_ = !util::bit_util::are_all_bytes_zero(
+ ctx->hardware_flags, null_masks() + size_per_row * num_rows_for_has_any_nulls_,
+ static_cast<uint32_t>(size_per_row * (num_rows_ - num_rows_for_has_any_nulls_)));
+ num_rows_for_has_any_nulls_ = num_rows_;
+ }
+ return has_any_nulls_;
+}
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/row/row_internal.h b/cpp/src/arrow/compute/row/row_internal.h
new file mode 100644
index 0000000000..d46ac0e9a9
--- /dev/null
+++ b/cpp/src/arrow/compute/row/row_internal.h
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/compute/light_array.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+namespace arrow {
+namespace compute {
+
+/// Description of the data stored in a RowTable
+struct ARROW_EXPORT RowTableMetadata {
+ /// \brief True if there are no variable length columns in the table
+ bool is_fixed_length;
+
+ /// For a fixed-length binary row, common size of rows in bytes,
+ /// rounded up to the multiple of alignment.
+ ///
+ /// For a varying-length binary, size of all encoded fixed-length key columns,
+ /// including lengths of varying-length columns, rounded up to the multiple of string
+ /// alignment.
+ uint32_t fixed_length;
+
+ /// Offset within a row to the array of 32-bit offsets within a row of
+ /// ends of varbinary fields.
+ /// Used only when the row is not fixed-length, zero for fixed-length row.
+ /// There are N elements for N varbinary fields.
+ /// Each element is the offset within a row of the first byte after
+ /// the corresponding varbinary field bytes in that row.
+ /// If varbinary fields begin at aligned addresses, than the end of the previous
+ /// varbinary field needs to be rounded up according to the specified alignment
+ /// to obtain the beginning of the next varbinary field.
+ /// The first varbinary field starts at offset specified by fixed_length,
+ /// which should already be aligned.
+ uint32_t varbinary_end_array_offset;
+
+ /// Fixed number of bytes per row that are used to encode null masks.
+ /// Null masks indicate for a single row which of its columns are null.
+ /// Nth bit in the sequence of bytes assigned to a row represents null
+ /// information for Nth field according to the order in which they are encoded.
+ int null_masks_bytes_per_row;
+
+ /// Power of 2. Every row will start at an offset aligned to that number of bytes.
+ int row_alignment;
+
+ /// Power of 2. Must be no greater than row alignment.
+ /// Every non-power-of-2 binary field and every varbinary field bytes
+ /// will start aligned to that number of bytes.
+ int string_alignment;
+
+ /// Metadata of encoded columns in their original order.
+ std::vector<KeyColumnMetadata> column_metadatas;
+
+ /// Order in which fields are encoded.
+ std::vector<uint32_t> column_order;
+
+ /// Offsets within a row to fields in their encoding order.
+ std::vector<uint32_t> column_offsets;
+
+ /// Rounding up offset to the nearest multiple of alignment value.
+ /// Alignment must be a power of 2.
+ static inline uint32_t padding_for_alignment(uint32_t offset, int required_alignment) {
+ ARROW_DCHECK(ARROW_POPCOUNT64(required_alignment) == 1);
+ return static_cast<uint32_t>((-static_cast<int32_t>(offset)) &
+ (required_alignment - 1));
+ }
+
+ /// Rounding up offset to the beginning of next column,
+ /// choosing required alignment based on the data type of that column.
+ static inline uint32_t padding_for_alignment(uint32_t offset, int string_alignment,
+ const KeyColumnMetadata& col_metadata) {
+ if (!col_metadata.is_fixed_length ||
+ ARROW_POPCOUNT64(col_metadata.fixed_length) <= 1) {
+ return 0;
+ } else {
+ return padding_for_alignment(offset, string_alignment);
+ }
+ }
+
+ /// Returns an array of offsets within a row of ends of varbinary fields.
+ inline const uint32_t* varbinary_end_array(const uint8_t* row) const {
+ ARROW_DCHECK(!is_fixed_length);
+ return reinterpret_cast<const uint32_t*>(row + varbinary_end_array_offset);
+ }
+
+ /// \brief An array of mutable offsets within a row of ends of varbinary fields.
+ inline uint32_t* varbinary_end_array(uint8_t* row) const {
+ ARROW_DCHECK(!is_fixed_length);
+ return reinterpret_cast<uint32_t*>(row + varbinary_end_array_offset);
+ }
+
+ /// Returns the offset within the row and length of the first varbinary field.
+ inline void first_varbinary_offset_and_length(const uint8_t* row, uint32_t* offset,
+ uint32_t* length) const {
+ ARROW_DCHECK(!is_fixed_length);
+ *offset = fixed_length;
+ *length = varbinary_end_array(row)[0] - fixed_length;
+ }
+
+ /// Returns the offset within the row and length of the second and further varbinary
+ /// fields.
+ inline void nth_varbinary_offset_and_length(const uint8_t* row, int varbinary_id,
+ uint32_t* out_offset,
+ uint32_t* out_length) const {
+ ARROW_DCHECK(!is_fixed_length);
+ ARROW_DCHECK(varbinary_id > 0);
+ const uint32_t* varbinary_end = varbinary_end_array(row);
+ uint32_t offset = varbinary_end[varbinary_id - 1];
+ offset += padding_for_alignment(offset, string_alignment);
+ *out_offset = offset;
+ *out_length = varbinary_end[varbinary_id] - offset;
+ }
+
+ uint32_t encoded_field_order(uint32_t icol) const { return column_order[icol]; }
+
+ uint32_t encoded_field_offset(uint32_t icol) const { return column_offsets[icol]; }
+
+ uint32_t num_cols() const { return static_cast<uint32_t>(column_metadatas.size()); }
+
+ uint32_t num_varbinary_cols() const;
+
+ /// \brief Populate this instance to describe `cols` with the given alignment
+ void FromColumnMetadataVector(const std::vector<KeyColumnMetadata>& cols,
+ int in_row_alignment, int in_string_alignment);
+
+ /// \brief True if `other` has the same number of columns
+ /// and each column has the same width (two variable length
+ /// columns are considered to have the same width)
+ bool is_compatible(const RowTableMetadata& other) const;
+};
+
+/// \brief A table of data stored in row-major order
+///
+/// Can only store non-nested data types
+///
+/// Can store both fixed-size data types and variable-length data types
+///
+/// The row table is not safe
+class ARROW_EXPORT RowTableImpl {
+ public:
+ RowTableImpl();
+ /// \brief Initialize a row array for use
+ ///
+ /// This must be called before any other method
+ Status Init(MemoryPool* pool, const RowTableMetadata& metadata);
+ /// \brief Clear all rows from the table
+ ///
+ /// Does not shrink buffers
+ void Clean();
+ /// \brief Add empty rows
+ /// \param num_rows_to_append The number of empty rows to append
+ /// \param num_extra_bytes_to_append For tables storing variable-length data this
+ /// should be a guess of how many data bytes will be needed to populate the
+ /// data. This is ignored if there are no variable-length columns
+ Status AppendEmpty(uint32_t num_rows_to_append, uint32_t num_extra_bytes_to_append);
+ /// \brief Append rows from a source table
+ /// \param from The table to append from
+ /// \param num_rows_to_append The number of rows to append
+ /// \param source_row_ids Indices (into `from`) of the desired rows
+ Status AppendSelectionFrom(const RowTableImpl& from, uint32_t num_rows_to_append,
+ const uint16_t* source_row_ids);
+ /// \brief Metadata describing the data stored in this table
+ const RowTableMetadata& metadata() const { return metadata_; }
+ /// \brief The number of rows stored in the table
+ int64_t length() const { return num_rows_; }
+ // Accessors into the table's buffers
+ const uint8_t* data(int i) const {
+ ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+ return buffers_[i];
+ }
+ uint8_t* mutable_data(int i) {
+ ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+ return buffers_[i];
+ }
+ const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
+ uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
+ const uint8_t* null_masks() const { return null_masks_->data(); }
+ uint8_t* null_masks() { return null_masks_->mutable_data(); }
+
+ /// \brief True if there is a null value anywhere in the table
+ ///
+ /// This calculation is memoized based on the number of rows and assumes
+ /// that values are only appended (and not modified in place) between
+ /// successive calls
+ bool has_any_nulls(const LightContext* ctx) const;
+
+ private:
+ Status ResizeFixedLengthBuffers(int64_t num_extra_rows);
+ Status ResizeOptionalVaryingLengthBuffer(int64_t num_extra_bytes);
+
+ // Helper functions to determine the number of bytes needed for each
+ // buffer given a number of rows.
+ int64_t size_null_masks(int64_t num_rows) const;
+ int64_t size_offsets(int64_t num_rows) const;
+ int64_t size_rows_fixed_length(int64_t num_rows) const;
+ int64_t size_rows_varying_length(int64_t num_bytes) const;
+
+ // Called after resize to fix pointers
+ void UpdateBufferPointers();
+
+ // The arrays in `buffers_` need to be padded so that
+ // vectorized operations can operate in blocks without
+ // worrying about tails
+ static constexpr int64_t kPaddingForVectors = 64;
+ MemoryPool* pool_;
+ RowTableMetadata metadata_;
+ // Buffers can only expand during lifetime and never shrink.
+ std::unique_ptr<ResizableBuffer> null_masks_;
+ // Only used if the table has variable-length columns
+ // Stores the offsets into the binary data (which is stored
+ // after all the fixed-sized fields)
+ std::unique_ptr<ResizableBuffer> offsets_;
+ // Stores the fixed-length parts of the rows
+ std::unique_ptr<ResizableBuffer> rows_;
+ static constexpr int kMaxBuffers = 3;
+ uint8_t* buffers_[kMaxBuffers];
+ // The number of rows in the table
+ int64_t num_rows_;
+ // The number of rows that can be stored in the table without resizing
+ int64_t rows_capacity_;
+ // The number of bytes that can be stored in the table without resizing
+ int64_t bytes_capacity_;
+
+ // Mutable to allow lazy evaluation
+ mutable int64_t num_rows_for_has_any_nulls_;
+ mutable bool has_any_nulls_;
+};
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc
index 89d90c758f..7108ff452f 100644
--- a/cpp/src/arrow/dataset/partition.cc
+++ b/cpp/src/arrow/dataset/partition.cc
@@ -26,11 +26,11 @@
#include "arrow/array/array_dict.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/builder_dict.h"
-#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/cast.h"
#include "arrow/compute/exec/expression_internal.h"
+#include "arrow/compute/row/grouper.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/scalar.h"
@@ -141,14 +141,13 @@ Result<Partitioning::PartitionedBatches> KeyValuePartitioning::Partition(
key_batch.values.emplace_back(batch->column_data(i));
}
- ARROW_ASSIGN_OR_RAISE(auto grouper,
- compute::internal::Grouper::Make(key_batch.GetDescriptors()));
+ ARROW_ASSIGN_OR_RAISE(auto grouper, compute::Grouper::Make(key_batch.GetDescriptors()));
ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch));
auto ids = id_batch.array_as<UInt32Array>();
- ARROW_ASSIGN_OR_RAISE(auto groupings, compute::internal::Grouper::MakeGroupings(
- *ids, grouper->num_groups()));
+ ARROW_ASSIGN_OR_RAISE(auto groupings,
+ compute::Grouper::MakeGroupings(*ids, grouper->num_groups()));
ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
ArrayVector unique_arrays(num_keys);
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index a3cb044855..39fc130c8a 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2391,7 +2391,7 @@ cdef extern from * namespace "arrow::compute":
const CBuffer& buffer)
-cdef extern from "arrow/compute/api_aggregate.h" namespace \
+cdef extern from "arrow/compute/exec/aggregate.h" namespace \
"arrow::compute::internal" nogil:
cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
c_string function