You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/04/21 13:13:22 UTC
[arrow] branch master updated: ARROW-16166: [C++][Compute] Utilities for assembling join output
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b0c75dee34 ARROW-16166: [C++][Compute] Utilities for assembling join output
b0c75dee34 is described below
commit b0c75dee34de65834e5a83438e6581f90970fd3d
Author: michalursa <mi...@ursacomputing.com>
AuthorDate: Thu Apr 21 09:13:05 2022 -0400
ARROW-16166: [C++][Compute] Utilities for assembling join output
Adding utilities that make it easier to filter, replicate and accumulate rows from potentially multiple sources in a single exec batch.
This is meant to be used in hash join for reducing overheads in producing its output.
Also, moving light-weight array utilities to the same file and adding helper functions for generating them.
Thanks to Weston Pace for updating comments and writing the unit tests.
Closes #12872 from michalursa/ARROW-16166-join-array-utils
Authored-by: michalursa <mi...@ursacomputing.com>
Signed-off-by: David Li <li...@gmail.com>
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/compute/CMakeLists.txt | 1 +
cpp/src/arrow/compute/exec/key_compare.cc | 25 +-
cpp/src/arrow/compute/exec/key_compare.h | 35 +-
cpp/src/arrow/compute/exec/key_compare_avx2.cc | 14 +-
cpp/src/arrow/compute/exec/key_encode.cc | 93 +--
cpp/src/arrow/compute/exec/key_encode.h | 75 +--
cpp/src/arrow/compute/exec/key_hash.cc | 4 +-
cpp/src/arrow/compute/exec/key_hash.h | 4 +-
cpp/src/arrow/compute/kernels/hash_aggregate.cc | 30 +-
cpp/src/arrow/compute/light_array.cc | 729 ++++++++++++++++++++++++
cpp/src/arrow/compute/light_array.h | 382 +++++++++++++
cpp/src/arrow/compute/light_array_test.cc | 481 ++++++++++++++++
13 files changed, 1654 insertions(+), 220 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 2933457287..2e74a2cac1 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -407,6 +407,7 @@ if(ARROW_COMPUTE)
compute/function.cc
compute/function_internal.cc
compute/kernel.cc
+ compute/light_array.cc
compute/registry.cc
compute/kernels/aggregate_basic.cc
compute/kernels/aggregate_mode.cc
diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt
index 897dc32f35..27e693d9a3 100644
--- a/cpp/src/arrow/compute/CMakeLists.txt
+++ b/cpp/src/arrow/compute/CMakeLists.txt
@@ -63,6 +63,7 @@ add_arrow_compute_test(internals_test
function_test.cc
exec_test.cc
kernel_test.cc
+ light_array_test.cc
registry_test.cc)
add_arrow_benchmark(function_benchmark PREFIX "arrow-compute")
diff --git a/cpp/src/arrow/compute/exec/key_compare.cc b/cpp/src/arrow/compute/exec/key_compare.cc
index ed94bf7230..d873aec692 100644
--- a/cpp/src/arrow/compute/exec/key_compare.cc
+++ b/cpp/src/arrow/compute/exec/key_compare.cc
@@ -34,7 +34,7 @@ void KeyCompare::NullUpdateColumnToRow(uint32_t id_col, uint32_t num_rows_to_com
const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map,
KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col,
+ const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector) {
if (!rows.has_any_nulls(ctx) && !col.data(0)) {
@@ -91,7 +91,7 @@ void KeyCompare::CompareBinaryColumnToRowHelper(
uint32_t offset_within_row, uint32_t first_row_to_compare,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector, COMPARE_FN compare_fn) {
bool is_fixed_length = rows.metadata().is_fixed_length;
if (is_fixed_length) {
@@ -121,7 +121,7 @@ template <bool use_selection>
void KeyCompare::CompareBinaryColumnToRow(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
uint32_t num_processed = 0;
#if defined(ARROW_HAVE_AVX2)
@@ -231,7 +231,7 @@ template <bool use_selection, bool is_first_varbinary_col>
void KeyCompare::CompareVarBinaryColumnToRow(
uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
#if defined(ARROW_HAVE_AVX2)
if (ctx->has_avx2()) {
@@ -306,14 +306,11 @@ void KeyCompare::AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num
}
}
-void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx,
- uint32_t* out_num_rows,
- uint16_t* out_sel_left_maybe_same,
- const std::vector<KeyEncoder::KeyColumnArray>& cols,
- const KeyEncoder::KeyRowArray& rows) {
+void KeyCompare::CompareColumnsToRows(
+ uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+ uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
+ const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows) {
if (num_rows_to_compare == 0) {
*out_num_rows = 0;
return;
@@ -333,7 +330,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
bool is_first_column = true;
for (size_t icol = 0; icol < cols.size(); ++icol) {
- const KeyEncoder::KeyColumnArray& col = cols[icol];
+ const KeyColumnArray& col = cols[icol];
if (col.metadata().is_null_type) {
// If this null type col is the first column, the match_bytevector_A needs to be
// initialized with 0xFF. Otherwise, the calculation can be skipped
@@ -374,7 +371,7 @@ void KeyCompare::CompareColumnsToRows(uint32_t num_rows_to_compare,
uint32_t ivarbinary = 0;
for (size_t icol = 0; icol < cols.size(); ++icol) {
- const KeyEncoder::KeyColumnArray& col = cols[icol];
+ const KeyColumnArray& col = cols[icol];
if (!col.metadata().is_fixed_length) {
// Process varbinary and nulls
if (sel_left_maybe_null) {
diff --git a/cpp/src/arrow/compute/exec/key_compare.h b/cpp/src/arrow/compute/exec/key_compare.h
index aeb5abbdd1..773b32d46c 100644
--- a/cpp/src/arrow/compute/exec/key_compare.h
+++ b/cpp/src/arrow/compute/exec/key_compare.h
@@ -33,14 +33,11 @@ class KeyCompare {
// Returns a single 16-bit selection vector of rows that failed comparison.
// If there is input selection on the left, the resulting selection is a filtered image
// of input selection.
- static void CompareColumnsToRows(uint32_t num_rows_to_compare,
- const uint16_t* sel_left_maybe_null,
- const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx,
- uint32_t* out_num_rows,
- uint16_t* out_sel_left_maybe_same,
- const std::vector<KeyEncoder::KeyColumnArray>& cols,
- const KeyEncoder::KeyRowArray& rows);
+ static void CompareColumnsToRows(
+ uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
+ const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
+ uint32_t* out_num_rows, uint16_t* out_sel_left_maybe_same,
+ const std::vector<KeyColumnArray>& cols, const KeyEncoder::KeyRowArray& rows);
private:
template <bool use_selection>
@@ -48,7 +45,7 @@ class KeyCompare {
const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map,
KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col,
+ const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector);
@@ -57,21 +54,21 @@ class KeyCompare {
uint32_t offset_within_row, uint32_t first_row_to_compare,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector, COMPARE_FN compare_fn);
template <bool use_selection>
static void CompareBinaryColumnToRow(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
template <bool use_selection, bool is_first_varbinary_col>
static void CompareVarBinaryColumnToRow(
uint32_t id_varlen_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
static void AndByteVectors(KeyEncoder::KeyEncoderContext* ctx, uint32_t num_elements,
@@ -83,14 +80,14 @@ class KeyCompare {
static uint32_t NullUpdateColumnToRowImp_avx2(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector);
template <bool use_selection, class COMPARE8_FN>
static uint32_t CompareBinaryColumnToRowHelper_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
COMPARE8_FN compare8_fn);
@@ -98,14 +95,14 @@ class KeyCompare {
static uint32_t CompareBinaryColumnToRowImp_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
template <bool use_selection, bool is_first_varbinary_col>
static void CompareVarBinaryColumnToRowImp_avx2(
uint32_t id_varlen_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
static uint32_t AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevector_A,
@@ -114,20 +111,20 @@ class KeyCompare {
static uint32_t NullUpdateColumnToRow_avx2(
bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
static uint32_t CompareBinaryColumnToRow_avx2(
bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);
static void CompareVarBinaryColumnToRow_avx2(
bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector);
#endif
diff --git a/cpp/src/arrow/compute/exec/key_compare_avx2.cc b/cpp/src/arrow/compute/exec/key_compare_avx2.cc
index df13e8cae3..e45486b2eb 100644
--- a/cpp/src/arrow/compute/exec/key_compare_avx2.cc
+++ b/cpp/src/arrow/compute/exec/key_compare_avx2.cc
@@ -40,7 +40,7 @@ template <bool use_selection>
uint32_t KeyCompare::NullUpdateColumnToRowImp_avx2(
uint32_t id_col, uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector) {
if (!rows.has_any_nulls(ctx) && !col.data(0)) {
return num_rows_to_compare;
@@ -180,7 +180,7 @@ template <bool use_selection, class COMPARE8_FN>
uint32_t KeyCompare::CompareBinaryColumnToRowHelper_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector,
COMPARE8_FN compare8_fn) {
bool is_fixed_length = rows.metadata().is_fixed_length;
@@ -419,7 +419,7 @@ template <bool use_selection>
uint32_t KeyCompare::CompareBinaryColumnToRowImp_avx2(
uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
uint32_t col_width = col.metadata().fixed_length;
if (col_width == 0) {
@@ -503,7 +503,7 @@ template <bool use_selection, bool is_first_varbinary_col>
void KeyCompare::CompareVarBinaryColumnToRowImp_avx2(
uint32_t id_varbinary_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
const uint32_t* offsets_left = col.offsets();
const uint32_t* offsets_right = rows.offsets();
@@ -569,7 +569,7 @@ uint32_t KeyCompare::AndByteVectors_avx2(uint32_t num_elements, uint8_t* bytevec
uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
bool use_selection, uint32_t id_col, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
if (use_selection) {
return NullUpdateColumnToRowImp_avx2<true>(id_col, num_rows_to_compare,
@@ -585,7 +585,7 @@ uint32_t KeyCompare::NullUpdateColumnToRow_avx2(
uint32_t KeyCompare::CompareBinaryColumnToRow_avx2(
bool use_selection, uint32_t offset_within_row, uint32_t num_rows_to_compare,
const uint16_t* sel_left_maybe_null, const uint32_t* left_to_right_map,
- KeyEncoder::KeyEncoderContext* ctx, const KeyEncoder::KeyColumnArray& col,
+ KeyEncoder::KeyEncoderContext* ctx, const KeyColumnArray& col,
const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector) {
if (use_selection) {
return CompareBinaryColumnToRowImp_avx2<true>(offset_within_row, num_rows_to_compare,
@@ -602,7 +602,7 @@ void KeyCompare::CompareVarBinaryColumnToRow_avx2(
bool use_selection, bool is_first_varbinary_col, uint32_t id_varlen_col,
uint32_t num_rows_to_compare, const uint16_t* sel_left_maybe_null,
const uint32_t* left_to_right_map, KeyEncoder::KeyEncoderContext* ctx,
- const KeyEncoder::KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
+ const KeyColumnArray& col, const KeyEncoder::KeyRowArray& rows,
uint8_t* match_bytevector) {
if (use_selection) {
if (is_first_varbinary_col) {
diff --git a/cpp/src/arrow/compute/exec/key_encode.cc b/cpp/src/arrow/compute/exec/key_encode.cc
index f8bd7c2503..3d92c77b09 100644
--- a/cpp/src/arrow/compute/exec/key_encode.cc
+++ b/cpp/src/arrow/compute/exec/key_encode.cc
@@ -271,87 +271,8 @@ bool KeyEncoder::KeyRowArray::has_any_nulls(const KeyEncoderContext* ctx) const
return has_any_nulls_;
}
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
- const KeyColumnArray& left,
- const KeyColumnArray& right,
- int buffer_id_to_replace) {
- metadata_ = metadata;
- length_ = left.length();
- for (int i = 0; i < max_buffers_; ++i) {
- buffers_[i] = left.buffers_[i];
- mutable_buffers_[i] = left.mutable_buffers_[i];
- }
- buffers_[buffer_id_to_replace] = right.buffers_[buffer_id_to_replace];
- mutable_buffers_[buffer_id_to_replace] = right.mutable_buffers_[buffer_id_to_replace];
- bit_offset_[0] = left.bit_offset_[0];
- bit_offset_[1] = left.bit_offset_[1];
- if (buffer_id_to_replace < max_buffers_ - 1) {
- bit_offset_[buffer_id_to_replace] = right.bit_offset_[buffer_id_to_replace];
- }
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
- int64_t length, const uint8_t* buffer0,
- const uint8_t* buffer1, const uint8_t* buffer2,
- int bit_offset0, int bit_offset1) {
- metadata_ = metadata;
- length_ = length;
- buffers_[0] = buffer0;
- buffers_[1] = buffer1;
- buffers_[2] = buffer2;
- mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
- bit_offset_[0] = bit_offset0;
- bit_offset_[1] = bit_offset1;
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata,
- int64_t length, uint8_t* buffer0,
- uint8_t* buffer1, uint8_t* buffer2,
- int bit_offset0, int bit_offset1) {
- metadata_ = metadata;
- length_ = length;
- buffers_[0] = mutable_buffers_[0] = buffer0;
- buffers_[1] = mutable_buffers_[1] = buffer1;
- buffers_[2] = mutable_buffers_[2] = buffer2;
- bit_offset_[0] = bit_offset0;
- bit_offset_[1] = bit_offset1;
-}
-
-KeyEncoder::KeyColumnArray::KeyColumnArray(const KeyColumnArray& from, int64_t start,
- int64_t length) {
- metadata_ = from.metadata_;
- length_ = length;
- uint32_t fixed_size =
- !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
-
- buffers_[0] =
- from.buffers_[0] ? from.buffers_[0] + (from.bit_offset_[0] + start) / 8 : nullptr;
- mutable_buffers_[0] = from.mutable_buffers_[0]
- ? from.mutable_buffers_[0] + (from.bit_offset_[0] + start) / 8
- : nullptr;
- bit_offset_[0] = (from.bit_offset_[0] + start) % 8;
-
- if (fixed_size == 0 && !metadata_.is_null_type) {
- buffers_[1] =
- from.buffers_[1] ? from.buffers_[1] + (from.bit_offset_[1] + start) / 8 : nullptr;
- mutable_buffers_[1] = from.mutable_buffers_[1] ? from.mutable_buffers_[1] +
- (from.bit_offset_[1] + start) / 8
- : nullptr;
- bit_offset_[1] = (from.bit_offset_[1] + start) % 8;
- } else {
- buffers_[1] = from.buffers_[1] ? from.buffers_[1] + start * fixed_size : nullptr;
- mutable_buffers_[1] = from.mutable_buffers_[1]
- ? from.mutable_buffers_[1] + start * fixed_size
- : nullptr;
- bit_offset_[1] = 0;
- }
-
- buffers_[2] = from.buffers_[2];
- mutable_buffers_[2] = from.mutable_buffers_[2];
-}
-
-KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(
- const KeyColumnArray& column, const KeyColumnArray& temp) {
+KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp) {
// Make sure that the temp buffer is large enough
DCHECK(temp.length() >= column.length() && temp.metadata().is_fixed_length &&
temp.metadata().fixed_length >= sizeof(uint8_t));
@@ -359,8 +280,7 @@ KeyEncoder::KeyColumnArray KeyEncoder::TransformBoolean::ArrayReplace(
metadata.is_fixed_length = true;
metadata.fixed_length = sizeof(uint8_t);
constexpr int buffer_index = 1;
- KeyColumnArray result = KeyColumnArray(metadata, column, temp, buffer_index);
- return result;
+ return column.WithBufferFrom(temp, buffer_index).WithMetadata(metadata);
}
void KeyEncoder::TransformBoolean::PostDecode(const KeyColumnArray& input,
@@ -387,8 +307,8 @@ bool KeyEncoder::EncoderInteger::UsesTransform(const KeyColumnArray& column) {
return IsBoolean(column.metadata());
}
-KeyEncoder::KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(
- const KeyColumnArray& column, const KeyColumnArray& temp) {
+KeyColumnArray KeyEncoder::EncoderInteger::ArrayReplace(const KeyColumnArray& column,
+ const KeyColumnArray& temp) {
if (IsBoolean(column.metadata())) {
return TransformBoolean::ArrayReplace(column, temp);
}
@@ -955,7 +875,8 @@ void KeyEncoder::PrepareKeyColumnArrays(int64_t start_row, int64_t num_rows,
uint32_t num_varbinary_visited = 0;
for (uint32_t i = 0; i < num_cols; ++i) {
const KeyColumnArray& col = cols_in[row_metadata_.column_order[i]];
- KeyColumnArray col_window(col, start_row, num_rows);
+ KeyColumnArray col_window = col.Slice(start_row, num_rows);
+
batch_all_cols_[i] = col_window;
if (!col.metadata().is_fixed_length) {
DCHECK(num_varbinary_visited < batch_varbinary_cols_.size());
diff --git a/cpp/src/arrow/compute/exec/key_encode.h b/cpp/src/arrow/compute/exec/key_encode.h
index da533434d3..f9de31d9c2 100644
--- a/cpp/src/arrow/compute/exec/key_encode.h
+++ b/cpp/src/arrow/compute/exec/key_encode.h
@@ -22,6 +22,7 @@
#include <vector>
#include "arrow/compute/exec/util.h"
+#include "arrow/compute/light_array.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/status.h"
@@ -30,8 +31,6 @@
namespace arrow {
namespace compute {
-class KeyColumnMetadata;
-
/// Converts between key representation as a collection of arrays for
/// individual columns and another representation as a single array of rows
/// combining data from all columns into one value.
@@ -49,27 +48,6 @@ class KeyEncoder {
util::TempVectorStack* stack;
};
- /// Description of a storage format of a single key column as needed
- /// for the purpose of row encoding.
- struct KeyColumnMetadata {
- KeyColumnMetadata() = default;
- KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
- bool is_null_type_in = false)
- : is_fixed_length(is_fixed_length_in),
- is_null_type(is_null_type_in),
- fixed_length(fixed_length_in) {}
- /// Is column storing a varying-length binary, using offsets array
- /// to find a beginning of a value, or is it a fixed-length binary.
- bool is_fixed_length;
- /// Is column null type
- bool is_null_type;
- /// For a fixed-length binary column: number of bytes per value.
- /// Zero has a special meaning, indicating a bit vector with one bit per value if it
- /// isn't a null type column.
- /// For a varying-length binary column: number of bytes per offset.
- uint32_t fixed_length;
- };
-
/// Description of a storage format for rows produced by encoder.
struct KeyRowMetadata {
/// Is row a varying-length binary, using offsets array to find a beginning of a row,
@@ -241,57 +219,6 @@ class KeyEncoder {
mutable bool has_any_nulls_;
};
- /// A lightweight description of an array representing one of key columns.
- class KeyColumnArray {
- public:
- KeyColumnArray() = default;
- /// Create as a mix of buffers according to the mask from two descriptions
- /// (Nth bit is set to 0 if Nth buffer from the first input
- /// should be used and is set to 1 otherwise).
- /// Metadata is inherited from the first input.
- KeyColumnArray(const KeyColumnMetadata& metadata, const KeyColumnArray& left,
- const KeyColumnArray& right, int buffer_id_to_replace);
- /// Create for reading
- KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
- const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
- int bit_offset0 = 0, int bit_offset1 = 0);
- /// Create for writing
- KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
- uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
- int bit_offset1 = 0);
- /// Create as a window view of original description that is offset
- /// by a given number of rows.
- /// The number of rows used in offset must be divisible by 8
- /// in order to not split bit vectors within a single byte.
- KeyColumnArray(const KeyColumnArray& from, int64_t start, int64_t length);
- uint8_t* mutable_data(int i) {
- ARROW_DCHECK(i >= 0 && i <= max_buffers_);
- return mutable_buffers_[i];
- }
- const uint8_t* data(int i) const {
- ARROW_DCHECK(i >= 0 && i <= max_buffers_);
- return buffers_[i];
- }
- uint32_t* mutable_offsets() { return reinterpret_cast<uint32_t*>(mutable_data(1)); }
- const uint32_t* offsets() const { return reinterpret_cast<const uint32_t*>(data(1)); }
- const KeyColumnMetadata& metadata() const { return metadata_; }
- int64_t length() const { return length_; }
- int bit_offset(int i) const {
- ARROW_DCHECK(i >= 0 && i < max_buffers_);
- return bit_offset_[i];
- }
-
- private:
- static constexpr int max_buffers_ = 3;
- const uint8_t* buffers_[max_buffers_];
- uint8_t* mutable_buffers_[max_buffers_];
- KeyColumnMetadata metadata_;
- int64_t length_;
- // Starting bit offset within the first byte (between 0 and 7)
- // to be used when accessing buffers that store bit vectors.
- int bit_offset_[max_buffers_ - 1];
- };
-
void Init(const std::vector<KeyColumnMetadata>& cols, KeyEncoderContext* ctx,
int row_alignment, int string_alignment);
diff --git a/cpp/src/arrow/compute/exec/key_hash.cc b/cpp/src/arrow/compute/exec/key_hash.cc
index bc4cae74dd..125fd3912e 100644
--- a/cpp/src/arrow/compute/exec/key_hash.cc
+++ b/cpp/src/arrow/compute/exec/key_hash.cc
@@ -375,7 +375,7 @@ void Hashing32::HashFixed(int64_t hardware_flags, bool combine_hashes, uint32_t
}
}
-void Hashing32::HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+void Hashing32::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint32_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
@@ -799,7 +799,7 @@ void Hashing64::HashFixed(bool combine_hashes, uint32_t num_rows, uint64_t lengt
}
}
-void Hashing64::HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+void Hashing64::HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes) {
uint32_t num_rows = static_cast<uint32_t>(cols[0].length());
diff --git a/cpp/src/arrow/compute/exec/key_hash.h b/cpp/src/arrow/compute/exec/key_hash.h
index 88f77be1a4..719f3dfd46 100644
--- a/cpp/src/arrow/compute/exec/key_hash.h
+++ b/cpp/src/arrow/compute/exec/key_hash.h
@@ -45,7 +45,7 @@ class ARROW_EXPORT Hashing32 {
friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
public:
- static void HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+ static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint32_t* out_hash);
private:
@@ -153,7 +153,7 @@ class ARROW_EXPORT Hashing64 {
friend void TestBloomSmall(BloomFilterBuildStrategy, int64_t, int, bool, bool);
public:
- static void HashMultiColumn(const std::vector<KeyEncoder::KeyColumnArray>& cols,
+ static void HashMultiColumn(const std::vector<KeyColumnArray>& cols,
KeyEncoder::KeyEncoderContext* ctx, uint64_t* hashes);
private:
diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
index ab8e6cd77d..db34ee6c59 100644
--- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc
+++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc
@@ -237,19 +237,18 @@ struct GrouperFastImpl : Grouper {
auto bit_width = checked_cast<const FixedWidthType&>(*key).bit_width();
ARROW_DCHECK(bit_width % 8 == 0);
impl->col_metadata_[icol] =
- arrow::compute::KeyEncoder::KeyColumnMetadata(true, bit_width / 8);
+ arrow::compute::KeyColumnMetadata(true, bit_width / 8);
} else if (key->id() == Type::BOOL) {
- impl->col_metadata_[icol] =
- arrow::compute::KeyEncoder::KeyColumnMetadata(true, 0);
+ impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(true, 0);
} else if (is_fixed_width(key->id())) {
- impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata(
+ impl->col_metadata_[icol] = arrow::compute::KeyColumnMetadata(
true, checked_cast<const FixedWidthType&>(*key).bit_width() / 8);
} else if (is_binary_like(key->id())) {
impl->col_metadata_[icol] =
- arrow::compute::KeyEncoder::KeyColumnMetadata(false, sizeof(uint32_t));
+ arrow::compute::KeyColumnMetadata(false, sizeof(uint32_t));
} else if (key->id() == Type::NA) {
- impl->col_metadata_[icol] = arrow::compute::KeyEncoder::KeyColumnMetadata(
- true, 0, /*is_null_type_in=*/true);
+ impl->col_metadata_[icol] =
+ arrow::compute::KeyColumnMetadata(true, 0, /*is_null_type_in=*/true);
} else {
return Status::NotImplemented("Keys of type ", *key);
}
@@ -352,11 +351,10 @@ struct GrouperFastImpl : Grouper {
int64_t offset = batch[icol].array()->offset;
- auto col_base = arrow::compute::KeyEncoder::KeyColumnArray(
+ auto col_base = arrow::compute::KeyColumnArray(
col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen);
- cols_[icol] =
- arrow::compute::KeyEncoder::KeyColumnArray(col_base, offset, num_rows);
+ cols_[icol] = col_base.Slice(offset, num_rows);
}
// Split into smaller mini-batches
@@ -434,8 +432,8 @@ struct GrouperFastImpl : Grouper {
if (col_metadata_[i].is_null_type) {
uint8_t* non_nulls = NULLPTR;
uint8_t* fixedlen = NULLPTR;
- cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
- col_metadata_[i], num_groups, non_nulls, fixedlen, NULLPTR);
+ cols_[i] = arrow::compute::KeyColumnArray(col_metadata_[i], num_groups, non_nulls,
+ fixedlen, NULLPTR);
continue;
}
ARROW_ASSIGN_OR_RAISE(non_null_bufs[i], AllocatePaddedBitmap(num_groups));
@@ -451,7 +449,7 @@ struct GrouperFastImpl : Grouper {
ARROW_ASSIGN_OR_RAISE(fixedlen_bufs[i],
AllocatePaddedBuffer((num_groups + 1) * sizeof(uint32_t)));
}
- cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
+ cols_[i] = arrow::compute::KeyColumnArray(
col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
fixedlen_bufs[i]->mutable_data(), nullptr);
}
@@ -470,7 +468,7 @@ struct GrouperFastImpl : Grouper {
auto varlen_size =
reinterpret_cast<const uint32_t*>(fixedlen_bufs[i]->data())[num_groups];
ARROW_ASSIGN_OR_RAISE(varlen_bufs[i], AllocatePaddedBuffer(varlen_size));
- cols_[i] = arrow::compute::KeyEncoder::KeyColumnArray(
+ cols_[i] = arrow::compute::KeyColumnArray(
col_metadata_[i], num_groups, non_null_bufs[i]->mutable_data(),
fixedlen_bufs[i]->mutable_data(), varlen_bufs[i]->mutable_data());
}
@@ -534,8 +532,8 @@ struct GrouperFastImpl : Grouper {
arrow::compute::KeyEncoder::KeyEncoderContext encode_ctx_;
std::vector<std::shared_ptr<arrow::DataType>> key_types_;
- std::vector<arrow::compute::KeyEncoder::KeyColumnMetadata> col_metadata_;
- std::vector<arrow::compute::KeyEncoder::KeyColumnArray> cols_;
+ std::vector<arrow::compute::KeyColumnMetadata> col_metadata_;
+ std::vector<arrow::compute::KeyColumnArray> cols_;
std::vector<uint32_t> minibatch_hashes_;
std::vector<std::shared_ptr<Array>> dictionaries_;
diff --git a/cpp/src/arrow/compute/light_array.cc b/cpp/src/arrow/compute/light_array.cc
new file mode 100644
index 0000000000..390dcc6cbf
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array.cc
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+ const uint8_t* validity_buffer,
+ const uint8_t* fixed_length_buffer,
+ const uint8_t* var_length_buffer, int bit_offset_validity,
+ int bit_offset_fixed) {
+ static_assert(std::is_pod<KeyColumnArray>::value,
+ "This class was intended to be a POD type");
+ metadata_ = metadata;
+ length_ = length;
+ buffers_[kValidityBuffer] = validity_buffer;
+ buffers_[kFixedLengthBuffer] = fixed_length_buffer;
+ buffers_[kVariableLengthBuffer] = var_length_buffer;
+ mutable_buffers_[kValidityBuffer] = mutable_buffers_[kFixedLengthBuffer] =
+ mutable_buffers_[kVariableLengthBuffer] = nullptr;
+ bit_offset_[kValidityBuffer] = bit_offset_validity;
+ bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+ uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
+ uint8_t* var_length_buffer, int bit_offset_validity,
+ int bit_offset_fixed) {
+ metadata_ = metadata;
+ length_ = length;
+ buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer;
+ buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] =
+ fixed_length_buffer;
+ buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] =
+ var_length_buffer;
+ bit_offset_[kValidityBuffer] = bit_offset_validity;
+ bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+ int buffer_id_to_replace) const {
+ KeyColumnArray copy = *this;
+ copy.mutable_buffers_[buffer_id_to_replace] =
+ other.mutable_buffers_[buffer_id_to_replace];
+ copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+ if (buffer_id_to_replace < kMaxBuffers - 1) {
+ copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace];
+ }
+ return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const {
+ KeyColumnArray copy = *this;
+ copy.metadata_ = metadata;
+ return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+ KeyColumnArray sliced;
+ sliced.metadata_ = metadata_;
+ sliced.length_ = length;
+ uint32_t fixed_size =
+ !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+ sliced.buffers_[0] =
+ buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+ sliced.mutable_buffers_[0] =
+ mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+ sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+ if (fixed_size == 0 && !metadata_.is_null_type) {
+ sliced.buffers_[1] =
+ buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+ sliced.mutable_buffers_[1] = mutable_buffers_[1]
+ ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8
+ : nullptr;
+ sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+ } else {
+ sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr;
+ sliced.mutable_buffers_[1] =
+ mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr;
+ sliced.bit_offset_[1] = 0;
+ }
+
+ sliced.buffers_[2] = buffers_[2];
+ sliced.mutable_buffers_[2] = mutable_buffers_[2];
+ return sliced;
+}
+
+Result<KeyColumnMetadata> ColumnMetadataFromDataType(
+ const std::shared_ptr<DataType>& type) {
+ if (type->id() == Type::DICTIONARY) {
+ auto bit_width =
+ arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
+ ARROW_DCHECK(bit_width % 8 == 0);
+ return KeyColumnMetadata(true, bit_width / 8);
+ }
+ if (type->id() == Type::BOOL) {
+ return KeyColumnMetadata(true, 0);
+ }
+ if (is_fixed_width(type->id())) {
+ return KeyColumnMetadata(
+ true,
+ arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
+ }
+ if (is_binary_like(type->id())) {
+ return KeyColumnMetadata(false, sizeof(uint32_t));
+ }
+ if (is_large_binary_like(type->id())) {
+ return KeyColumnMetadata(false, sizeof(uint64_t));
+ }
+ if (type->id() == Type::NA) {
+ return KeyColumnMetadata(true, 0, true);
+ }
+ // Caller attempted to create a KeyColumnArray from an invalid type
+ return Status::TypeError("Unsupported column data type ", type->name(),
+ " used with KeyColumnMetadata");
+}
+
+Result<KeyColumnArray> ColumnArrayFromArrayData(
+ const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows) {
+ ARROW_ASSIGN_OR_RAISE(KeyColumnMetadata metadata,
+ ColumnMetadataFromDataType(array_data->type));
+ KeyColumnArray column_array = KeyColumnArray(
+ metadata, array_data->offset + start_row + num_rows,
+ array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
+ array_data->buffers[1]->data(),
+ (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+ ? array_data->buffers[2]->data()
+ : nullptr);
+ return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+Status ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+ std::vector<KeyColumnMetadata>* column_metadatas) {
+ int num_columns = static_cast<int>(batch.values.size());
+ column_metadatas->resize(num_columns);
+ for (int i = 0; i < num_columns; ++i) {
+ const Datum& data = batch.values[i];
+ ARROW_DCHECK(data.is_array());
+ const std::shared_ptr<ArrayData>& array_data = data.array();
+ ARROW_ASSIGN_OR_RAISE((*column_metadatas)[i],
+ ColumnMetadataFromDataType(array_data->type));
+ }
+ return Status::OK();
+}
+
+Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+ std::vector<KeyColumnArray>* column_arrays) {
+ int num_columns = static_cast<int>(batch.values.size());
+ column_arrays->resize(num_columns);
+ for (int i = 0; i < num_columns; ++i) {
+ const Datum& data = batch.values[i];
+ ARROW_DCHECK(data.is_array());
+ const std::shared_ptr<ArrayData>& array_data = data.array();
+ ARROW_ASSIGN_OR_RAISE((*column_arrays)[i],
+ ColumnArrayFromArrayData(array_data, start_row, num_rows));
+ }
+ return Status::OK();
+}
+
+Status ColumnArraysFromExecBatch(const ExecBatch& batch,
+ std::vector<KeyColumnArray>* column_arrays) {
+ return ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length),
+ column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+ MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+ if (num_rows_allocated_ > 0) {
+ ARROW_DCHECK(data_type_ != NULLPTR);
+ KeyColumnMetadata metadata_before =
+ ColumnMetadataFromDataType(data_type_).ValueOrDie();
+ KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type).ValueOrDie();
+ ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length &&
+ metadata_before.fixed_length == metadata_after.fixed_length);
+ }
+#endif
+ Clear(/*release_buffers=*/false);
+ log_num_rows_min_ = log_num_rows_min;
+ data_type_ = data_type;
+ pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+ num_rows_ = 0;
+ if (release_buffers) {
+ buffers_[kValidityBuffer].reset();
+ buffers_[kFixedLengthBuffer].reset();
+ buffers_[kVariableLengthBuffer].reset();
+ num_rows_allocated_ = 0;
+ var_len_buf_size_ = 0;
+ }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+ ARROW_DCHECK(num_rows_new >= 0);
+ if (num_rows_new <= num_rows_allocated_) {
+ num_rows_ = num_rows_new;
+ return Status::OK();
+ }
+
+ int num_rows_allocated_new = 1 << log_num_rows_min_;
+ while (num_rows_allocated_new < num_rows_new) {
+ num_rows_allocated_new *= 2;
+ }
+
+ KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+ if (buffers_[kFixedLengthBuffer] == NULLPTR) {
+ ARROW_DCHECK(buffers_[kValidityBuffer] == NULLPTR &&
+ buffers_[kVariableLengthBuffer] == NULLPTR);
+
+ ARROW_ASSIGN_OR_RAISE(
+ buffers_[kValidityBuffer],
+ AllocateResizableBuffer(
+ bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_));
+ if (column_metadata.is_fixed_length) {
+ if (column_metadata.fixed_length == 0) {
+ ARROW_ASSIGN_OR_RAISE(
+ buffers_[kFixedLengthBuffer],
+ AllocateResizableBuffer(
+ bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes,
+ pool_));
+ } else {
+ ARROW_ASSIGN_OR_RAISE(
+ buffers_[kFixedLengthBuffer],
+ AllocateResizableBuffer(
+ num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes,
+ pool_));
+ }
+ } else {
+ ARROW_ASSIGN_OR_RAISE(
+ buffers_[kFixedLengthBuffer],
+ AllocateResizableBuffer(
+ (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_));
+ }
+
+ ARROW_ASSIGN_OR_RAISE(
+ buffers_[kVariableLengthBuffer],
+ AllocateResizableBuffer(sizeof(uint64_t) + kNumPaddingBytes, pool_));
+
+ var_len_buf_size_ = sizeof(uint64_t);
+ } else {
+ ARROW_DCHECK(buffers_[kValidityBuffer] != NULLPTR &&
+ buffers_[kVariableLengthBuffer] != NULLPTR);
+
+ RETURN_NOT_OK(buffers_[kValidityBuffer]->Resize(
+ bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+
+ if (column_metadata.is_fixed_length) {
+ if (column_metadata.fixed_length == 0) {
+ RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+ bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+ } else {
+ RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+ num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes));
+ }
+ } else {
+ RETURN_NOT_OK(buffers_[kFixedLengthBuffer]->Resize(
+ (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+ }
+ }
+
+ num_rows_allocated_ = num_rows_allocated_new;
+ num_rows_ = num_rows_new;
+
+ return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+ KeyColumnMetadata column_metadata;
+ column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+ if (!column_metadata.is_fixed_length) {
+ int min_new_size = static_cast<int>(reinterpret_cast<const uint32_t*>(
+ buffers_[kFixedLengthBuffer]->data())[num_rows_]);
+ ARROW_DCHECK(var_len_buf_size_ > 0);
+ if (var_len_buf_size_ < min_new_size) {
+ int new_size = var_len_buf_size_;
+ while (new_size < min_new_size) {
+ new_size *= 2;
+ }
+ RETURN_NOT_OK(buffers_[kVariableLengthBuffer]->Resize(new_size + kNumPaddingBytes));
+ var_len_buf_size_ = new_size;
+ }
+ }
+
+ return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+ KeyColumnMetadata column_metadata;
+ column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+ return KeyColumnArray(column_metadata, num_rows_,
+ buffers_[kValidityBuffer]->mutable_data(),
+ buffers_[kFixedLengthBuffer]->mutable_data(),
+ buffers_[kVariableLengthBuffer]->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+ KeyColumnMetadata column_metadata;
+ column_metadata = ColumnMetadataFromDataType(data_type_).ValueOrDie();
+
+ auto valid_count = arrow::internal::CountSetBits(
+ buffers_[kValidityBuffer]->data(), /*offset=*/0, static_cast<int64_t>(num_rows_));
+ int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+ if (column_metadata.is_fixed_length) {
+ return ArrayData::Make(data_type_, num_rows_,
+ {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer]},
+ null_count);
+ } else {
+ return ArrayData::Make(data_type_, num_rows_,
+ {buffers_[kValidityBuffer], buffers_[kFixedLengthBuffer],
+ buffers_[kVariableLengthBuffer]},
+ null_count);
+ }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+ int num_rows, const uint16_t* row_ids,
+ int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+ // Ids must be in non-decreasing order
+ //
+ for (int i = 1; i < num_rows; ++i) {
+ ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+ }
+#endif
+
+ KeyColumnMetadata column_metadata =
+ ColumnMetadataFromDataType(column->type).ValueOrDie();
+
+ int num_rows_left = num_rows;
+ int num_bytes_skipped = 0;
+ while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+ if (column_metadata.is_fixed_length) {
+ if (column_metadata.fixed_length == 0) {
+ num_rows_left = std::max(num_rows_left, 8) - 8;
+ ++num_bytes_skipped;
+ } else {
+ --num_rows_left;
+ num_bytes_skipped += column_metadata.fixed_length;
+ }
+ } else {
+ --num_rows_left;
+ int row_id_removed = row_ids[num_rows_left];
+ const uint32_t* offsets =
+ reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+ num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
+ }
+ }
+
+ return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+ int64_t input_bits_offset, uint8_t* output_bits,
+ int64_t output_bits_offset, int num_rows,
+ const uint16_t* row_ids) {
+ if (!OUTPUT_BYTE_ALIGNED) {
+ ARROW_DCHECK(output_bits_offset % 8 > 0);
+ output_bits[output_bits_offset / 8] &=
+ static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+ } else {
+ ARROW_DCHECK(output_bits_offset % 8 == 0);
+ }
+ constexpr int unroll = 8;
+ for (int i = 0; i < num_rows / unroll; ++i) {
+ const uint16_t* row_ids_base = row_ids + unroll * i;
+ uint8_t result;
+ result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0;
+ result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0;
+ if (OUTPUT_BYTE_ALIGNED) {
+ output_bits[output_bits_offset / 8 + i] = result;
+ } else {
+ output_bits[output_bits_offset / 8 + i] |=
+ static_cast<uint8_t>(result << (output_bits_offset % 8));
+ output_bits[output_bits_offset / 8 + i + 1] =
+ static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+ }
+ }
+ if (num_rows % unroll > 0) {
+ for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+ bit_util::SetBitTo(output_bits, output_bits_offset + i,
+ bit_util::GetBit(input_bits, input_bits_offset + row_ids[i]));
+ }
+ }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+ uint8_t* output_bits, int64_t output_bits_offset,
+ int num_rows, const uint16_t* row_ids) {
+ if (output_bits_offset % 8 > 0) {
+ CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+ num_rows, row_ids);
+ } else {
+ CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+ num_rows, row_ids);
+ }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+ const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) {
+ KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type).ValueOrDie();
+
+ if (!metadata.is_fixed_length) {
+ const uint8_t* ptr_base = column->buffers[2]->data();
+ const uint32_t* offsets =
+ reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
+ for (int i = 0; i < num_rows; ++i) {
+ uint16_t row_id = row_ids[i];
+ const uint8_t* field_ptr = ptr_base + offsets[row_id];
+ uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+ process_value_fn(i, field_ptr, field_length);
+ }
+ } else {
+ ARROW_DCHECK(metadata.fixed_length > 0);
+ for (int i = 0; i < num_rows; ++i) {
+ uint16_t row_id = row_ids[i];
+ const uint8_t* field_ptr =
+ column->buffers[1]->data() +
+ (column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
+ process_value_fn(i, field_ptr, metadata.fixed_length);
+ }
+ }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
+ ResizableArrayData* target,
+ int num_rows_to_append, const uint16_t* row_ids,
+ MemoryPool* pool) {
+ int num_rows_before = target->num_rows();
+ ARROW_DCHECK(num_rows_before >= 0);
+ int num_rows_after = num_rows_before + num_rows_to_append;
+ if (target->num_rows() == 0) {
+ target->Init(source->type, pool, kLogNumRows);
+ }
+ RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+ KeyColumnMetadata column_metadata =
+ ColumnMetadataFromDataType(source->type).ValueOrDie();
+
+ if (column_metadata.is_fixed_length) {
+ // Fixed length column
+ //
+ uint32_t fixed_length = column_metadata.fixed_length;
+ switch (fixed_length) {
+ case 0:
+ CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
+ num_rows_before, num_rows_to_append, row_ids);
+ break;
+ case 1:
+ Visit(source, num_rows_to_append, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ target->mutable_data(1)[num_rows_before + i] = *ptr;
+ });
+ break;
+ case 2:
+ Visit(
+ source, num_rows_to_append, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+ *reinterpret_cast<const uint16_t*>(ptr);
+ });
+ break;
+ case 4:
+ Visit(
+ source, num_rows_to_append, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+ *reinterpret_cast<const uint32_t*>(ptr);
+ });
+ break;
+ case 8:
+ Visit(
+ source, num_rows_to_append, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+ *reinterpret_cast<const uint64_t*>(ptr);
+ });
+ break;
+ default: {
+ int num_rows_to_process =
+ num_rows_to_append -
+ NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+ Visit(source, num_rows_to_process, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ uint64_t* dst = reinterpret_cast<uint64_t*>(
+ target->mutable_data(1) +
+ static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+ const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+ for (uint32_t word_id = 0;
+ word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+ ++word_id) {
+ util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+ }
+ });
+ if (num_rows_to_append > num_rows_to_process) {
+ Visit(source, num_rows_to_append - num_rows_to_process,
+ row_ids + num_rows_to_process,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ uint64_t* dst = reinterpret_cast<uint64_t*>(
+ target->mutable_data(1) +
+ static_cast<int64_t>(num_bytes) *
+ (num_rows_before + num_rows_to_process + i));
+ const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+ memcpy(dst, src, num_bytes);
+ });
+ }
+ }
+ }
+ } else {
+ // Varying length column
+ //
+
+ // Step 1: calculate target offsets
+ //
+ uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+ uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+ Visit(source, num_rows_to_append, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ offsets[num_rows_before + i] = num_bytes;
+ });
+ for (int i = 0; i < num_rows_to_append; ++i) {
+ uint32_t length = offsets[num_rows_before + i];
+ offsets[num_rows_before + i] = sum;
+ sum += length;
+ }
+ offsets[num_rows_before + num_rows_to_append] = sum;
+
+ // Step 2: resize output buffers
+ //
+ RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+ // Step 3: copy varying-length data
+ //
+ int num_rows_to_process =
+ num_rows_to_append -
+ NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+ Visit(source, num_rows_to_process, row_ids,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+ offsets[num_rows_before + i]);
+ const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+ for (uint32_t word_id = 0;
+ word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
+ util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+ }
+ });
+ Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
+ [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+ uint64_t* dst = reinterpret_cast<uint64_t*>(
+ target->mutable_data(2) +
+ offsets[num_rows_before + num_rows_to_process + i]);
+ const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+ memcpy(dst, src, num_bytes);
+ });
+ }
+
+ // Process nulls
+ //
+ if (source->buffers[0] == NULLPTR) {
+ uint8_t* dst = target->mutable_data(0);
+ dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
+ for (int i = num_rows_before / 8 + 1;
+ i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
+ dst[i] = 0xff;
+ }
+ } else {
+ CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
+ num_rows_before, num_rows_to_append, row_ids);
+ }
+
+ return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+ ResizableArrayData& target, int num_rows_to_append,
+ MemoryPool* pool) {
+ int num_rows_before = target.num_rows();
+ int num_rows_after = num_rows_before + num_rows_to_append;
+ if (target.num_rows() == 0) {
+ target.Init(type, pool, kLogNumRows);
+ }
+ RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+ KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+
+ // Process fixed length buffer
+ //
+ if (column_metadata.is_fixed_length) {
+ uint8_t* dst = target.mutable_data(1);
+ if (column_metadata.fixed_length == 0) {
+ dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+ int64_t offset_begin = num_rows_before / 8 + 1;
+ int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+ if (offset_end > offset_begin) {
+ memset(dst + offset_begin, 0, offset_end - offset_begin);
+ }
+ } else {
+ memset(dst + num_rows_before * static_cast<int64_t>(column_metadata.fixed_length),
+ 0, static_cast<int64_t>(column_metadata.fixed_length) * num_rows_to_append);
+ }
+ } else {
+ uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+ uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+ for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+ dst[i] = sum;
+ }
+ }
+
+ // Process nulls
+ //
+ uint8_t* dst = target.mutable_data(0);
+ dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+ int64_t offset_begin = num_rows_before / 8 + 1;
+ int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+ if (offset_end > offset_begin) {
+ memset(dst + offset_begin, 0, offset_end - offset_begin);
+ }
+
+ return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+ int num_rows_to_append, const uint16_t* row_ids,
+ int num_cols, const int* col_ids) {
+ if (num_rows_to_append == 0) {
+ return Status::OK();
+ }
+
+ if (num_rows() + num_rows_to_append > num_rows_max()) {
+ return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows");
+ }
+
+ // If this is the first time we append rows, then initialize output buffers.
+ //
+ if (values_.empty()) {
+ values_.resize(num_cols);
+ for (int i = 0; i < num_cols; ++i) {
+ const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+ ARROW_DCHECK(data.is_array());
+ const std::shared_ptr<ArrayData>& array_data = data.array();
+ values_[i].Init(array_data->type, pool, kLogNumRows);
+ }
+ }
+
+ for (size_t i = 0; i < values_.size(); ++i) {
+ const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+ ARROW_DCHECK(data.is_array());
+ const std::shared_ptr<ArrayData>& array_data = data.array();
+ RETURN_NOT_OK(
+ AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool));
+ }
+
+ return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+ const std::vector<std::shared_ptr<DataType>>& types,
+ int num_rows_to_append) {
+ if (num_rows_to_append == 0) {
+ return Status::OK();
+ }
+
+ if (num_rows() + num_rows_to_append > num_rows_max()) {
+ return Status::CapacityError("ExecBatch builder exceeded limit of accumulated rows.");
+ }
+
+ // If this is the first time we append rows, then initialize output buffers.
+ //
+ if (values_.empty()) {
+ values_.resize(types.size());
+ for (size_t i = 0; i < types.size(); ++i) {
+ values_[i].Init(types[i], pool, kLogNumRows);
+ }
+ }
+
+ for (size_t i = 0; i < values_.size(); ++i) {
+ RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+ }
+
+ return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+ ARROW_DCHECK(num_rows() > 0);
+ ExecBatch out({}, num_rows());
+ out.values.resize(values_.size());
+ for (size_t i = 0; i < values_.size(); ++i) {
+ out.values[i] = values_[i].array_data();
+ values_[i].Clear(true);
+ }
+ return out;
+}
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/light_array.h b/cpp/src/arrow/compute/light_array.h
new file mode 100644
index 0000000000..0856e3e8aa
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array.h
@@ -0,0 +1,382 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers. These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+ KeyColumnMetadata() = default;
+ KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+ bool is_null_type_in = false)
+ : is_fixed_length(is_fixed_length_in),
+ is_null_type(is_null_type_in),
+ fixed_length(fixed_length_in) {}
+ /// \brief True if the column is not a varying-length binary type
+ ///
+ /// If this is true the column will have a validity buffer and
+ /// a data buffer and the third buffer will be unused.
+ bool is_fixed_length;
+ /// \brief True if this column is the null type
+ bool is_null_type;
+ /// \brief The number of bytes for each item
+ ///
+ /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+ /// isn't a null type column.
+ ///
+ /// For a varying-length binary column this represents the number of bytes per offset.
+ uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+ /// \brief Create an uninitialized KeyColumnArray
+ KeyColumnArray() = default;
+ /// \brief Create a read-only view from buffers
+ ///
+ /// This is a view only and does not take ownership of the buffers. The lifetime
+ /// of the buffers must exceed the lifetime of this view
+ KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+ const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer,
+ const uint8_t* var_length_buffer, int bit_offset_validity = 0,
+ int bit_offset_fixed = 0);
+ /// \brief Create a mutable view from buffers
+ ///
+ /// This is a view only and does not take ownership of the buffers. The lifetime
+ /// of the buffers must exceed the lifetime of this view
+ KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+ uint8_t* validity_buffer, uint8_t* fixed_length_buffer,
+ uint8_t* var_length_buffer, int bit_offset_validity = 0,
+ int bit_offset_fixed = 0);
+ /// \brief Create a sliced view of `this`
+ ///
+ /// The number of rows used in offset must be divisible by 8
+ /// in order to not split bit vectors within a single byte.
+ KeyColumnArray Slice(int64_t offset, int64_t length) const;
+ /// \brief Create a copy of `this` with a buffer from `other`
+ ///
+ /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+ /// will be replaced by the corresponding buffer in `other`.
+ KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+ int buffer_id_to_replace) const;
+
+ /// \brief Create a copy of `this` with new metadata
+ KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+ // Constants used for accessing buffers using data() and mutable_data().
+ static constexpr int kValidityBuffer = 0;
+ static constexpr int kFixedLengthBuffer = 1;
+ static constexpr int kVariableLengthBuffer = 2;
+
+ /// \brief Return one of the underlying mutable buffers
+ uint8_t* mutable_data(int i) {
+ ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+ return mutable_buffers_[i];
+ }
+ /// \brief Return one of the underlying read-only buffers
+ const uint8_t* data(int i) const {
+ ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+ return buffers_[i];
+ }
+ /// \brief Return a mutable version of the offsets buffer
+ ///
+ /// Only valid if this is a view into a varbinary type
+ uint32_t* mutable_offsets() {
+ DCHECK(!metadata_.is_fixed_length);
+ return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
+ }
+ /// \brief Return a read-only version of the offsets buffer
+ ///
+ /// Only valid if this is a view into a varbinary type
+ const uint32_t* offsets() const {
+ DCHECK(!metadata_.is_fixed_length);
+ return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
+ }
+ /// \brief Return the type metadata
+ const KeyColumnMetadata& metadata() const { return metadata_; }
+ /// \brief Return the length (in rows) of the array
+ int64_t length() const { return length_; }
+ /// \brief Return the bit offset into the corresponding vector
+ ///
+ /// if i == 1 then this must be a bool array
+ int bit_offset(int i) const {
+ ARROW_DCHECK(i >= 0 && i < kMaxBuffers);
+ return bit_offset_[i];
+ }
+
+ private:
+ static constexpr int kMaxBuffers = 3;
+ const uint8_t* buffers_[kMaxBuffers];
+ uint8_t* mutable_buffers_[kMaxBuffers];
+ KeyColumnMetadata metadata_;
+ int64_t length_;
+ // Starting bit offset within the first byte (between 0 and 7)
+ // to be used when accessing buffers that store bit vectors.
+ int bit_offset_[kMaxBuffers - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// This should only be called on "key" columns. Calling this with
+/// a non-key column will return Status::TypeError.
+ARROW_EXPORT Result<KeyColumnMetadata> ColumnMetadataFromDataType(
+ const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+ARROW_EXPORT Result<KeyColumnArray> ColumnArrayFromArrayData(
+ const std::shared_ptr<ArrayData>& array_data, int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+ARROW_EXPORT Status ColumnMetadatasFromExecBatch(
+ const ExecBatch& batch, std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row,
+ int num_rows,
+ std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+ARROW_EXPORT Status ColumnArraysFromExecBatch(const ExecBatch& batch,
+ std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ARROW_EXPORT ResizableArrayData {
+ public:
+ /// \brief Create an uninitialized instance
+ ///
+ /// Init must be called before calling any other operations
+ ResizableArrayData()
+ : log_num_rows_min_(0),
+ pool_(NULLPTR),
+ num_rows_(0),
+ num_rows_allocated_(0),
+ var_len_buf_size_(0) {}
+
+ ~ResizableArrayData() { Clear(true); }
+
+ /// \brief Initialize the array
+ /// \param data_type The data type this array is holding data for.
+ /// \param pool The pool to make allocations on
+ /// \param log_num_rows_min All resize operations will allocate at least enough
+ /// space for (1 << log_num_rows_min) rows
+ void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+ int log_num_rows_min);
+
+ /// \brief Resets the array back to an empty state
+ /// \param release_buffers If true then allocated memory is released and the
+ /// next resize operation will have to reallocate memory
+ void Clear(bool release_buffers);
+
+ /// \brief Resize the fixed length buffers
+ ///
+ /// The buffers will be resized to hold at least `num_rows_new` rows of data
+ Status ResizeFixedLengthBuffers(int num_rows_new);
+
+ /// \brief Resize the varying length buffer if this array is a variable binary type
+ ///
+ /// This must be called after offsets have been populated and the buffer will be
+ /// resized to hold at least as much data as the offsets require
+ ///
+ /// Does nothing if the array is not a variable binary type
+ Status ResizeVaryingLengthBuffer();
+
+ /// \brief The current length (in rows) of the array
+ int num_rows() const { return num_rows_; }
+
+ /// \brief A non-owning view into this array
+ KeyColumnArray column_array() const;
+
+ /// \brief A lightweight descriptor of the data held by this array
+ Result<KeyColumnMetadata> column_metadata() const {
+ return ColumnMetadataFromDataType(data_type_);
+ }
+
+ /// \brief Convert the data to an arrow::ArrayData
+ ///
+ /// This is a zero copy operation and the created ArrayData will reference the
+ /// buffers held by this instance.
+ std::shared_ptr<ArrayData> array_data() const;
+
+ // Constants used for accessing buffers using mutable_data().
+ static constexpr int kValidityBuffer = 0;
+ static constexpr int kFixedLengthBuffer = 1;
+ static constexpr int kVariableLengthBuffer = 2;
+
+ /// \brief A raw pointer to the requested buffer
+ ///
+ /// If i is 0 (kValidityBuffer) then this returns the validity buffer
+ /// If i is 1 (kFixedLengthBuffer) then this returns the buffer used for values (if this
+ /// is a fixed
+ /// length data type) or offsets (if this is a variable binary type)
+ /// If i is 2 (kVariableLengthBuffer) then this returns the buffer used for variable
+ /// length binary data
+ uint8_t* mutable_data(int i) { return buffers_[i]->mutable_data(); }
+
+ private:
+ static constexpr int64_t kNumPaddingBytes = 64;
+ int log_num_rows_min_;
+ std::shared_ptr<DataType> data_type_;
+ MemoryPool* pool_;
+ int num_rows_;
+ int num_rows_allocated_;
+ int var_len_buf_size_;
+ static constexpr int kMaxBuffers = 3;
+ std::shared_ptr<ResizableBuffer> buffers_[kMaxBuffers];
+};
+
+/// \brief A builder to concatenate batches of data into a larger batch
+///
+/// Will only store num_rows_max() rows
+class ARROW_EXPORT ExecBatchBuilder {
+ public:
+ /// \brief Add rows from `source` into `target` column
+ ///
+ /// If `target` is uninitialized or cleared it will be initialized to use
+ /// the given pool.
+ static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
+ ResizableArrayData* target, int num_rows_to_append,
+ const uint16_t* row_ids, MemoryPool* pool);
+
+ /// \brief Add nulls into `target` column
+ ///
+ /// If `target` is uninitialized or cleared it will be initialized to use
+ /// the given pool.
+ static Status AppendNulls(const std::shared_ptr<DataType>& type,
+ ResizableArrayData& target, int num_rows_to_append,
+ MemoryPool* pool);
+
+ /// \brief Add selected rows from `batch`
+ ///
+ /// If `col_ids` is null then `num_cols` should less than batch.num_values() and
+ /// the first `num_cols` columns of batch will be appended.
+ ///
+ /// All columns in `batch` must have array shape
+ Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append,
+ const uint16_t* row_ids, int num_cols,
+ const int* col_ids = NULLPTR);
+
+ /// \brief Add all-null rows
+ Status AppendNulls(MemoryPool* pool,
+ const std::vector<std::shared_ptr<DataType>>& types,
+ int num_rows_to_append);
+
+ /// \brief Create an ExecBatch with the data that has been appended so far
+ /// and clear this builder to be used again
+ ///
+ /// Should only be called if num_rows() returns non-zero.
+ ExecBatch Flush();
+
+ int num_rows() const { return values_.empty() ? 0 : values_[0].num_rows(); }
+
+ static int num_rows_max() { return 1 << kLogNumRows; }
+
+ private:
+ static constexpr int kLogNumRows = 15;
+
+ // Calculate how many rows to skip from the tail of the
+ // sequence of selected rows, such that the total size of skipped rows is at
+ // least equal to the size specified by the caller.
+ //
+ // Skipping of the tail rows
+ // is used to allow for faster processing by the caller of remaining rows
+ // without checking buffer bounds (useful with SIMD or fixed size memory loads
+ // and stores).
+ //
+ // The sequence of row_ids provided must be non-decreasing.
+ //
+ static int NumRowsToSkip(const std::shared_ptr<ArrayData>& column, int num_rows,
+ const uint16_t* row_ids, int num_tail_bytes_to_skip);
+
+ // The supplied lambda will be called for each row in the given list of rows.
+ // The arguments given to it will be:
+ // - index of a row (within the set of selected rows),
+ // - pointer to the value,
+ // - byte length of the value.
+ //
+ // The information about nulls (validity bitmap) is not used in this call and
+ // has to be processed separately.
+ //
+ template <class PROCESS_VALUE_FN>
+ static void Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+ const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn);
+
+ template <bool OUTPUT_BYTE_ALIGNED>
+ static void CollectBitsImp(const uint8_t* input_bits, int64_t input_bits_offset,
+ uint8_t* output_bits, int64_t output_bits_offset,
+ int num_rows, const uint16_t* row_ids);
+ static void CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+ uint8_t* output_bits, int64_t output_bits_offset, int num_rows,
+ const uint16_t* row_ids);
+
+ std::vector<ResizableArrayData> values_;
+};
+
+} // namespace compute
+} // namespace arrow
diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc
new file mode 100644
index 0000000000..3f6d478035
--- /dev/null
+++ b/cpp/src/arrow/compute/light_array_test.cc
@@ -0,0 +1,481 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+ int8(), int16(), int32(), int64(), uint8(),
+ uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {utf8(), binary()};
+
+TEST(KeyColumnMetadata, FromDataType) {
+ KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean()).ValueOrDie();
+ ASSERT_EQ(0, metadata.fixed_length);
+ ASSERT_EQ(true, metadata.is_fixed_length);
+ ASSERT_EQ(false, metadata.is_null_type);
+
+ metadata = ColumnMetadataFromDataType(null()).ValueOrDie();
+ ASSERT_EQ(true, metadata.is_null_type);
+
+ for (const auto& type : kSampleFixedDataTypes) {
+ int byte_width =
+ arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+ metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+ ASSERT_EQ(byte_width, metadata.fixed_length);
+ ASSERT_EQ(true, metadata.is_fixed_length);
+ ASSERT_EQ(false, metadata.is_null_type);
+ }
+
+ for (const auto& type : {binary(), utf8()}) {
+ metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+ ASSERT_EQ(4, metadata.fixed_length);
+ ASSERT_EQ(false, metadata.is_fixed_length);
+ ASSERT_EQ(false, metadata.is_null_type);
+ }
+
+ for (const auto& type : {large_binary(), large_utf8()}) {
+ metadata = ColumnMetadataFromDataType(type).ValueOrDie();
+ ASSERT_EQ(8, metadata.fixed_length);
+ ASSERT_EQ(false, metadata.is_fixed_length);
+ ASSERT_EQ(false, metadata.is_null_type);
+ }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+ for (const auto& type : kSampleFixedDataTypes) {
+ ARROW_SCOPED_TRACE("Type: ", type->ToString());
+ // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+ // source array) while `offset` is the offset we pass when constructing the
+ // KeyColumnArray
+ for (auto array_offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+ for (auto offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+ std::shared_ptr<Array> array;
+ int byte_width =
+ arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+ if (is_decimal(type->id())) {
+ array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+ } else {
+ array = ArrayFromJSON(type, "[1, 2, null]");
+ }
+ array = array->Slice(array_offset);
+ int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+ int buffer_offset_bytes = (offset + array_offset) * byte_width;
+ KeyColumnArray kc_array =
+ ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+ // Maximum tested offset is < 8 so validity is just bit offset
+ ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+ ASSERT_EQ(0, kc_array.bit_offset(1));
+ ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+ ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+ kc_array.data(1));
+ ASSERT_EQ(nullptr, kc_array.data(2));
+ ASSERT_EQ(length, kc_array.length());
+ // When creating from ArrayData always create read-only
+ ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+ ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+ ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+ }
+ }
+ }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+ for (const auto& type : kSampleBinaryTypes) {
+ ARROW_SCOPED_TRACE("Type: ", type->ToString());
+ for (auto array_offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+ for (auto offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+ std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+ int offsets_width =
+ static_cast<int>(arrow::internal::checked_pointer_cast<BaseBinaryType>(type)
+ ->layout()
+ .buffers[1]
+ .byte_width);
+ array = array->Slice(array_offset);
+ int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+ int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+ KeyColumnArray kc_array =
+ ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+ ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+ ASSERT_EQ(0, kc_array.bit_offset(1));
+ ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+ ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+ kc_array.data(1));
+ ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+ ASSERT_EQ(length, kc_array.length());
+ // When creating from ArrayData always create read-only
+ ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+ ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+ ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+ }
+ }
+ }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+ for (auto array_offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+ for (auto offset : {0, 1}) {
+ ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+ std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+ array = array->Slice(array_offset);
+ int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+ KeyColumnArray kc_array =
+ ColumnArrayFromArrayData(array->data(), offset, length).ValueOrDie();
+ ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+ ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+ ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+ ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+ ASSERT_EQ(length, kc_array.length());
+ ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+ ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+ }
+ }
+}
+
+TEST(KeyColumnArray, Slice) {
+ constexpr int kValuesByteLength = 128;
+ // Size needed for validity depends on byte_width but 16 will always be big enough
+ constexpr int kValidityByteLength = 16;
+ uint8_t validity_buffer[kValidityByteLength];
+ uint8_t values_buffer[kValuesByteLength];
+ for (auto byte_width : {2, 4}) {
+ ARROW_SCOPED_TRACE("Byte Width: ", byte_width);
+ int64_t length = kValuesByteLength / byte_width;
+ KeyColumnMetadata metadata(true, byte_width);
+ KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr);
+
+ for (int offset : {0, 4, 12}) {
+ ARROW_SCOPED_TRACE("Offset: ", offset);
+ for (int length : {0, 4}) {
+ ARROW_SCOPED_TRACE("Length: ", length);
+ KeyColumnArray sliced = array.Slice(offset, length);
+ int expected_validity_bit_offset = (offset == 0) ? 0 : 4;
+ int expected_validity_byte_offset = (offset == 12) ? 1 : 0;
+ int expected_values_byte_offset = byte_width * offset;
+ ASSERT_EQ(expected_validity_bit_offset, sliced.bit_offset(0));
+ ASSERT_EQ(0, sliced.bit_offset(1));
+ ASSERT_EQ(validity_buffer + expected_validity_byte_offset,
+ sliced.mutable_data(0));
+ ASSERT_EQ(values_buffer + expected_values_byte_offset, sliced.mutable_data(1));
+ }
+ }
+ }
+}
+
+TEST(KeyColumnArray, SliceBool) {
+ constexpr int kValuesByteLength = 2;
+ constexpr int kValidityByteLength = 2;
+ uint8_t validity_buffer[kValidityByteLength];
+ uint8_t values_buffer[kValuesByteLength];
+ int length = 16;
+ KeyColumnMetadata metadata(true, /*byte_width=*/0);
+ KeyColumnArray array(metadata, length, validity_buffer, values_buffer, nullptr);
+
+ for (int offset : {0, 4, 12}) {
+ ARROW_SCOPED_TRACE("Offset: ", offset);
+ for (int length : {0, 4}) {
+ ARROW_SCOPED_TRACE("Length: ", length);
+ KeyColumnArray sliced = array.Slice(offset, length);
+ int expected_bit_offset = (offset == 0) ? 0 : 4;
+ int expected_byte_offset = (offset == 12) ? 1 : 0;
+ ASSERT_EQ(expected_bit_offset, sliced.bit_offset(0));
+ ASSERT_EQ(expected_bit_offset, sliced.bit_offset(1));
+ ASSERT_EQ(validity_buffer + expected_byte_offset, sliced.mutable_data(0));
+ ASSERT_EQ(values_buffer + expected_byte_offset, sliced.mutable_data(1));
+ }
+ }
+}
+
+TEST(KeyColumnArray, FromExecBatch) {
+ ExecBatch batch =
+ ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+ std::vector<KeyColumnArray> arrays;
+ ASSERT_OK(ColumnArraysFromExecBatch(batch, &arrays));
+
+ ASSERT_EQ(2, arrays.size());
+ ASSERT_EQ(8, arrays[0].metadata().fixed_length);
+ ASSERT_EQ(0, arrays[1].metadata().fixed_length);
+ ASSERT_EQ(3, arrays[0].length());
+ ASSERT_EQ(3, arrays[1].length());
+
+ ASSERT_OK(ColumnArraysFromExecBatch(batch, 1, 1, &arrays));
+
+ ASSERT_EQ(2, arrays.size());
+ ASSERT_EQ(8, arrays[0].metadata().fixed_length);
+ ASSERT_EQ(0, arrays[1].metadata().fixed_length);
+ ASSERT_EQ(1, arrays[0].length());
+ ASSERT_EQ(1, arrays[1].length());
+}
+
+TEST(ResizableArrayData, Basic) {
+ std::unique_ptr<MemoryPool> pool = MemoryPool::CreateDefault();
+ for (const auto& type : kSampleFixedDataTypes) {
+ ARROW_SCOPED_TRACE("Type: ", type->ToString());
+ int byte_width =
+ arrow::internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+ {
+ ResizableArrayData array;
+ array.Init(type, pool.get(), /*log_num_rows_min=*/16);
+ ASSERT_EQ(0, array.num_rows());
+ ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+ ASSERT_EQ(2, array.num_rows());
+ // Even though we are only asking for 2 rows we specified a rather high
+ // log_num_rows_min so it should allocate at least that many rows. Padding
+ // and rounding up to a power of 2 will make the allocations larger.
+ int min_bytes_needed_for_values = byte_width * (1 << 16);
+ int min_bytes_needed_for_validity = (1 << 16) / 8;
+ int min_bytes_needed = min_bytes_needed_for_values + min_bytes_needed_for_validity;
+ ASSERT_LT(min_bytes_needed, pool->bytes_allocated());
+ ASSERT_GT(min_bytes_needed * 2, pool->bytes_allocated());
+
+ ASSERT_OK(array.ResizeFixedLengthBuffers(1 << 17));
+ ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated());
+ ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated());
+ ASSERT_EQ(1 << 17, array.num_rows());
+
+ // Shrinking array won't shrink allocated RAM
+ ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+ ASSERT_LT(min_bytes_needed * 2, pool->bytes_allocated());
+ ASSERT_GT(min_bytes_needed * 4, pool->bytes_allocated());
+ ASSERT_EQ(2, array.num_rows());
+ }
+ // After array is destroyed buffers should be freed
+ ASSERT_EQ(0, pool->bytes_allocated());
+ }
+}
+
+TEST(ResizableArrayData, Binary) {
+ std::unique_ptr<MemoryPool> pool = MemoryPool::CreateDefault();
+ for (const auto& type : kSampleBinaryTypes) {
+ ARROW_SCOPED_TRACE("Type: ", type->ToString());
+ {
+ ResizableArrayData array;
+ array.Init(type, pool.get(), /*log_num_rows_min=*/4);
+ ASSERT_EQ(0, array.num_rows());
+ ASSERT_OK(array.ResizeFixedLengthBuffers(2));
+ ASSERT_EQ(2, array.num_rows());
+ // At this point the offets memory has been allocated and needs to be filled
+ // in before we allocate the variable length memory
+ int offsets_width =
+ static_cast<int>(arrow::internal::checked_pointer_cast<BaseBinaryType>(type)
+ ->layout()
+ .buffers[1]
+ .byte_width);
+ if (offsets_width == 4) {
+ auto offsets = reinterpret_cast<int32_t*>(array.mutable_data(1));
+ offsets[0] = 0;
+ offsets[1] = 1000;
+ offsets[2] = 2000;
+ } else if (offsets_width == 8) {
+ auto offsets = reinterpret_cast<int64_t*>(array.mutable_data(1));
+ offsets[0] = 0;
+ offsets[1] = 1000;
+ offsets[2] = 2000;
+ } else {
+ FAIL() << "Unexpected offsets_width: " << offsets_width;
+ }
+ ASSERT_OK(array.ResizeVaryingLengthBuffer());
+ // Each string is 1000 bytes. The offsets, padding, etc. should be less than 1000
+ // bytes
+ ASSERT_LT(2000, pool->bytes_allocated());
+ ASSERT_GT(3000, pool->bytes_allocated());
+ }
+ // After array is destroyed buffers should be freed
+ ASSERT_EQ(0, pool->bytes_allocated());
+ }
+}
+
+TEST(ExecBatchBuilder, AppendBatches) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ MemoryPool* pool = owned_pool.get();
+ ExecBatch batch_one =
+ ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+ ExecBatch batch_two =
+ ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+ ExecBatch combined = ExecBatchFromJSON(
+ {int64(), boolean()},
+ "[[1, true], [2, false], [null, null], [null, true], [5, true], [6, false]]");
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[3] = {0, 1, 2};
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2));
+ ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/2));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(combined, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendBatchesSomeRows) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ MemoryPool* pool = owned_pool.get();
+ ExecBatch batch_one =
+ ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+ ExecBatch batch_two =
+ ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+ ExecBatch combined = ExecBatchFromJSON(
+ {int64(), boolean()}, "[[1, true], [2, false], [null, true], [5, true]]");
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[2] = {0, 1};
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 2, row_ids, /*num_cols=*/2));
+ ASSERT_OK(builder.AppendSelected(pool, batch_two, 2, row_ids, /*num_cols=*/2));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(combined, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendBatchesSomeCols) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ MemoryPool* pool = owned_pool.get();
+ ExecBatch batch_one =
+ ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+ ExecBatch batch_two =
+ ExecBatchFromJSON({int64(), boolean()}, "[[null, true], [5, true], [6, false]]");
+ ExecBatch first_col_only =
+ ExecBatchFromJSON({int64()}, "[[1], [2], [null], [null], [5], [6]]");
+ ExecBatch last_col_only = ExecBatchFromJSON(
+ {boolean()}, "[[true], [false], [null], [true], [true], [false]]");
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[3] = {0, 1, 2};
+ int first_col_ids[1] = {0};
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1,
+ first_col_ids));
+ ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1,
+ first_col_ids));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(first_col_only, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[3] = {0, 1, 2};
+ // If we don't specify col_ids and num_cols is 1 it is implicitly the first col
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1));
+ ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(first_col_only, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[3] = {0, 1, 2};
+ int last_col_ids[1] = {1};
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/1,
+ last_col_ids));
+ ASSERT_OK(builder.AppendSelected(pool, batch_two, 3, row_ids, /*num_cols=*/1,
+ last_col_ids));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(last_col_only, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendNulls) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ MemoryPool* pool = owned_pool.get();
+ ExecBatch batch_one =
+ ExecBatchFromJSON({int64(), boolean()}, "[[1, true], [2, false], [null, null]]");
+ ExecBatch combined = ExecBatchFromJSON(
+ {int64(), boolean()},
+ "[[1, true], [2, false], [null, null], [null, null], [null, null]]");
+ ExecBatch just_nulls =
+ ExecBatchFromJSON({int64(), boolean()}, "[[null, null], [null, null]]");
+ {
+ ExecBatchBuilder builder;
+ uint16_t row_ids[3] = {0, 1, 2};
+ ASSERT_OK(builder.AppendSelected(pool, batch_one, 3, row_ids, /*num_cols=*/2));
+ ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(combined, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ {
+ ExecBatchBuilder builder;
+ ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 2));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(just_nulls, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendNullsBeyondLimit) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ int num_rows_max = ExecBatchBuilder::num_rows_max();
+ MemoryPool* pool = owned_pool.get();
+ {
+ ExecBatchBuilder builder;
+ ASSERT_OK(builder.AppendNulls(pool, {int64(), boolean()}, 10));
+ ASSERT_RAISES(CapacityError,
+ builder.AppendNulls(pool, {int64(), boolean()}, num_rows_max + 1 - 10));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(10, built.length);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+TEST(ExecBatchBuilder, AppendValuesBeyondLimit) {
+ std::unique_ptr<MemoryPool> owned_pool = MemoryPool::CreateDefault();
+ MemoryPool* pool = owned_pool.get();
+ int num_rows_max = ExecBatchBuilder::num_rows_max();
+ std::shared_ptr<Array> values = ConstantArrayGenerator::Int32(num_rows_max + 1);
+ std::shared_ptr<Array> trimmed_values = ConstantArrayGenerator::Int32(10);
+ ExecBatch batch({values}, num_rows_max + 1);
+ ExecBatch trimmed_batch({trimmed_values}, 10);
+ std::vector<uint16_t> first_set_row_ids(10);
+ std::iota(first_set_row_ids.begin(), first_set_row_ids.end(), 0);
+ std::vector<uint16_t> second_set_row_ids(num_rows_max + 1 - 10);
+ std::iota(second_set_row_ids.begin(), second_set_row_ids.end(), 10);
+ {
+ ExecBatchBuilder builder;
+ ASSERT_OK(builder.AppendSelected(pool, batch, 10, first_set_row_ids.data(),
+ /*num_cols=*/1));
+ ASSERT_RAISES(CapacityError,
+ builder.AppendSelected(pool, batch, num_rows_max + 1 - 10,
+ second_set_row_ids.data(),
+ /*num_cols=*/1));
+ ExecBatch built = builder.Flush();
+ ASSERT_EQ(trimmed_batch, built);
+ ASSERT_NE(0, pool->bytes_allocated());
+ }
+ ASSERT_EQ(0, pool->bytes_allocated());
+}
+
+} // namespace compute
+} // namespace arrow