You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/11/06 17:48:42 UTC
(arrow) branch main updated: GH-38381: [C++][Acero] Create a sorted merge node (#38380)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b55d8d5664 GH-38381: [C++][Acero] Create a sorted merge node (#38380)
b55d8d5664 is described below
commit b55d8d5664480f63c2be75f0e18eeb006b640427
Author: Jeremy Aguilon <je...@gmail.com>
AuthorDate: Mon Nov 6 12:48:35 2023 -0500
GH-38381: [C++][Acero] Create a sorted merge node (#38380)
### Rationale for this change
This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).
Where possible I have shared components with `asof_join_node.cc`.
Full description/use case is described in https://github.com/apache/arrow/issues/38381
### What changes are included in this PR?
* Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
* Implement a sorted merger
### Are these changes tested?
Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.
### Are there any user-facing changes?
Yes, `sorted_merge` is now an exposed declaration
Lead-authored-by: Jeremy Aguilon <je...@gmail.com>
Co-authored-by: jeremy <je...@gmail.com>
Co-authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/acero/CMakeLists.txt | 6 +-
cpp/src/arrow/acero/asof_join_node.cc | 432 +++--------------
cpp/src/arrow/acero/backpressure_handler.h | 74 +++
cpp/src/arrow/acero/concurrent_queue_internal.h | 161 +++++++
cpp/src/arrow/acero/exec_plan.cc | 2 +
cpp/src/arrow/acero/sorted_merge_node.cc | 609 ++++++++++++++++++++++++
cpp/src/arrow/acero/sorted_merge_node_test.cc | 87 ++++
cpp/src/arrow/acero/time_series_util.cc | 63 +++
cpp/src/arrow/acero/time_series_util.h | 31 ++
cpp/src/arrow/acero/unmaterialized_table.h | 271 +++++++++++
10 files changed, 1357 insertions(+), 379 deletions(-)
diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index 153413b33c..b77d52a23e 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -49,9 +49,11 @@ set(ARROW_ACERO_SRCS
project_node.cc
query_context.cc
sink_node.cc
+ sorted_merge_node.cc
source_node.cc
swiss_join.cc
task_util.cc
+ time_series_util.cc
tpch_node.cc
union_node.cc
util.cc)
@@ -173,11 +175,13 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)
-# asof_join_node uses std::thread internally
+# asof_join_node and sorted_merge_node use std::thread internally
# and doesn't use ThreadPool so it will
# be broken if threading is turned off
if(ARROW_ENABLE_THREADING)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
+ add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc
+ test_nodes.cc)
endif()
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index d19d2db299..4a3b6b199c 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -16,6 +16,8 @@
// under the License.
#include "arrow/acero/asof_join_node.h"
+#include "arrow/acero/backpressure_handler.h"
+#include "arrow/acero/concurrent_queue_internal.h"
#include <atomic>
#include <condition_variable>
@@ -30,6 +32,7 @@
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
+#include "arrow/acero/unmaterialized_table.h"
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
@@ -41,6 +44,7 @@
#ifndef NDEBUG
#include "arrow/compute/function_internal.h"
#endif
+#include "arrow/acero/time_series_util.h"
#include "arrow/compute/key_hash.h"
#include "arrow/compute/light_array.h"
#include "arrow/record_batch.h"
@@ -122,92 +126,12 @@ struct TolType {
typedef uint64_t row_index_t;
typedef int col_index_t;
-// normalize the value to 64-bits while preserving ordering of values
-template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
-static inline uint64_t time_value(T t) {
- uint64_t bias = std::is_signed<T>::value ? (uint64_t)1 << (8 * sizeof(T) - 1) : 0;
- return t < 0 ? static_cast<uint64_t>(t + bias) : static_cast<uint64_t>(t);
-}
-
// indicates normalization of a key value
template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
static inline uint64_t key_value(T t) {
return static_cast<uint64_t>(t);
}
-/**
- * Simple implementation for an unbound concurrent queue
- */
-template <class T>
-class ConcurrentQueue {
- public:
- T Pop() {
- std::unique_lock<std::mutex> lock(mutex_);
- cond_.wait(lock, [&] { return !queue_.empty(); });
- return PopUnlocked();
- }
-
- T PopUnlocked() {
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
-
- void Push(const T& item) {
- std::unique_lock<std::mutex> lock(mutex_);
- return PushUnlocked(item);
- }
-
- void PushUnlocked(const T& item) {
- queue_.push(item);
- cond_.notify_one();
- }
-
- void Clear() {
- std::unique_lock<std::mutex> lock(mutex_);
- ClearUnlocked();
- }
-
- void ClearUnlocked() { queue_ = std::queue<T>(); }
-
- std::optional<T> TryPop() {
- std::unique_lock<std::mutex> lock(mutex_);
- return TryPopUnlocked();
- }
-
- std::optional<T> TryPopUnlocked() {
- // Try to pop the oldest value from the queue (or return nullopt if none)
- if (queue_.empty()) {
- return std::nullopt;
- } else {
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
- }
-
- bool Empty() const {
- std::unique_lock<std::mutex> lock(mutex_);
- return queue_.empty();
- }
-
- // Un-synchronized access to front
- // For this to be "safe":
- // 1) the caller logically guarantees that queue is not empty
- // 2) pop/try_pop cannot be called concurrently with this
- const T& UnsyncFront() const { return queue_.front(); }
-
- size_t UnsyncSize() const { return queue_.size(); }
-
- protected:
- std::mutex& GetMutex() { return mutex_; }
-
- private:
- std::queue<T> queue_;
- mutable std::mutex mutex_;
- std::condition_variable cond_;
-};
-
class AsofJoinNode;
#ifndef NDEBUG
@@ -547,104 +471,6 @@ class BackpressureController : public BackpressureControl {
std::atomic<int32_t>& backpressure_counter_;
};
-class BackpressureHandler {
- private:
- BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
- std::unique_ptr<BackpressureControl> backpressure_control)
- : input_(input),
- low_threshold_(low_threshold),
- high_threshold_(high_threshold),
- backpressure_control_(std::move(backpressure_control)) {}
-
- public:
- static Result<BackpressureHandler> Make(
- ExecNode* input, size_t low_threshold, size_t high_threshold,
- std::unique_ptr<BackpressureControl> backpressure_control) {
- if (low_threshold >= high_threshold) {
- return Status::Invalid("low threshold (", low_threshold,
- ") must be less than high threshold (", high_threshold, ")");
- }
- if (backpressure_control == NULLPTR) {
- return Status::Invalid("null backpressure control parameter");
- }
- BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
- std::move(backpressure_control));
- return std::move(backpressure_handler);
- }
-
- void Handle(size_t start_level, size_t end_level) {
- if (start_level < high_threshold_ && end_level >= high_threshold_) {
- backpressure_control_->Pause();
- } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
- backpressure_control_->Resume();
- }
- }
-
- Status ForceShutdown() {
- // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
- // Since acero's executor won't terminate if any one node is paused, we need to
- // force resume the node before stopping production.
- backpressure_control_->Resume();
- return input_->StopProducing();
- }
-
- private:
- ExecNode* input_;
- size_t low_threshold_;
- size_t high_threshold_;
- std::unique_ptr<BackpressureControl> backpressure_control_;
-};
-
-template <typename T>
-class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
- private:
- struct DoHandle {
- explicit DoHandle(BackpressureConcurrentQueue& queue)
- : queue_(queue), start_size_(queue_.UnsyncSize()) {}
-
- ~DoHandle() {
- size_t end_size = queue_.UnsyncSize();
- queue_.handler_.Handle(start_size_, end_size);
- }
-
- BackpressureConcurrentQueue& queue_;
- size_t start_size_;
- };
-
- public:
- explicit BackpressureConcurrentQueue(BackpressureHandler handler)
- : handler_(std::move(handler)) {}
-
- T Pop() {
- std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
- DoHandle do_handle(*this);
- return ConcurrentQueue<T>::PopUnlocked();
- }
-
- void Push(const T& item) {
- std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
- DoHandle do_handle(*this);
- ConcurrentQueue<T>::PushUnlocked(item);
- }
-
- void Clear() {
- std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
- DoHandle do_handle(*this);
- ConcurrentQueue<T>::ClearUnlocked();
- }
-
- std::optional<T> TryPop() {
- std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
- DoHandle do_handle(*this);
- return ConcurrentQueue<T>::TryPopUnlocked();
- }
-
- Status ForceShutdown() { return handler_.ForceShutdown(); }
-
- private:
- BackpressureHandler handler_;
-};
-
class InputState {
// InputState correponds to an input
// Input record batches are queued up in InputState until processed and
@@ -783,29 +609,8 @@ class InputState {
}
inline OnType GetLatestTime() const {
- return GetTime(GetLatestBatch().get(), latest_ref_row_);
- }
-
- inline ByType GetTime(const RecordBatch* batch, row_index_t row) const {
- auto data = batch->column_data(time_col_index_);
- switch (time_type_id_) {
- LATEST_VAL_CASE(INT8, time_value)
- LATEST_VAL_CASE(INT16, time_value)
- LATEST_VAL_CASE(INT32, time_value)
- LATEST_VAL_CASE(INT64, time_value)
- LATEST_VAL_CASE(UINT8, time_value)
- LATEST_VAL_CASE(UINT16, time_value)
- LATEST_VAL_CASE(UINT32, time_value)
- LATEST_VAL_CASE(UINT64, time_value)
- LATEST_VAL_CASE(DATE32, time_value)
- LATEST_VAL_CASE(DATE64, time_value)
- LATEST_VAL_CASE(TIME32, time_value)
- LATEST_VAL_CASE(TIME64, time_value)
- LATEST_VAL_CASE(TIMESTAMP, time_value)
- default:
- DCHECK(false);
- return 0; // cannot happen
- }
+ return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+ latest_ref_row_);
}
#undef LATEST_VAL_CASE
@@ -832,7 +637,9 @@ class InputState {
have_active_batch &= !queue_.TryPop();
if (have_active_batch) {
DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed
- memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed
+ memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), time_type_id_,
+ time_col_index_,
+ 0)); // time changed
}
}
}
@@ -988,35 +795,25 @@ class InputState {
std::vector<std::optional<col_index_t>> src_to_dst_;
};
+/// Wrapper around UnmaterializedCompositeTable that knows how to emplace
+/// the join row-by-row
template <size_t MAX_TABLES>
-struct CompositeReferenceRow {
- struct Entry {
- arrow::RecordBatch* batch; // can be NULL if there's no value
- row_index_t row;
- };
- Entry refs[MAX_TABLES];
-};
+class CompositeTableBuilder {
+ using SliceBuilder = UnmaterializedSliceBuilder<MAX_TABLES>;
+ using CompositeTable = UnmaterializedCompositeTable<MAX_TABLES>;
-// A table of composite reference rows. Rows maintain pointers to the
-// constituent record batches, but the overall table retains shared_ptr
-// references to ensure memory remains resident while the table is live.
-//
-// The main reason for this is that, especially for wide tables, joins
-// are effectively row-oriented, rather than column-oriented. Separating
-// the join part from the columnar materialization part simplifies the
-// logic around data types and increases efficiency.
-//
-// We don't put the shared_ptr's into the rows for efficiency reasons.
-template <size_t MAX_TABLES>
-class CompositeReferenceTable {
public:
- NDEBUG_EXPLICIT CompositeReferenceTable(DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
- : DEBUG_ADD(n_tables_(n_tables), node_(node)) {
+ NDEBUG_EXPLICIT CompositeTableBuilder(
+ const std::vector<std::unique_ptr<InputState>>& inputs,
+ const std::shared_ptr<Schema>& schema, arrow::MemoryPool* pool,
+ DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
+ : unmaterialized_table(InitUnmaterializedTable(schema, inputs, pool)),
+ DEBUG_ADD(n_tables_(n_tables), node_(node)) {
DCHECK_GE(n_tables_, 1);
DCHECK_LE(n_tables_, MAX_TABLES);
}
- size_t n_rows() const { return rows_.size(); }
+ size_t n_rows() const { return unmaterialized_table.Size(); }
// Adds the latest row from the input state as a new composite reference row
// - LHS must have a valid key,timestep,and latest rows
@@ -1037,14 +834,16 @@ class CompositeReferenceTable {
// On the first row of the batch, we resize the destination.
// The destination size is dictated by the size of the LHS batch.
row_index_t new_batch_size = lhs_latest_batch->num_rows();
- row_index_t new_capacity = rows_.size() + new_batch_size;
- if (rows_.capacity() < new_capacity) rows_.reserve(new_capacity);
+ row_index_t new_capacity = unmaterialized_table.Size() + new_batch_size;
+ if (unmaterialized_table.capacity() < new_capacity) {
+ unmaterialized_table.reserve(new_capacity);
+ }
}
- rows_.resize(rows_.size() + 1);
- auto& row = rows_.back();
- row.refs[0].batch = lhs_latest_batch.get();
- row.refs[0].row = lhs_latest_row;
- AddRecordBatchRef(lhs_latest_batch);
+
+ SliceBuilder new_row{&unmaterialized_table};
+
+ // Each item represents a portion of the columns of the output table
+ new_row.AddEntry(lhs_latest_batch, lhs_latest_row, lhs_latest_row + 1);
DEBUG_SYNC(node_, "Emplace: key=", key, " lhs_latest_row=", lhs_latest_row,
" lhs_latest_time=", lhs_latest_time, DEBUG_MANIP(std::endl));
@@ -1068,100 +867,25 @@ class CompositeReferenceTable {
if (tolerance.Accepts(lhs_latest_time, (*opt_entry)->time)) {
// Have a valid entry
const MemoStore::Entry* entry = *opt_entry;
- row.refs[i].batch = entry->batch.get();
- row.refs[i].row = entry->row;
- AddRecordBatchRef(entry->batch);
+ new_row.AddEntry(entry->batch, entry->row, entry->row + 1);
continue;
}
}
- row.refs[i].batch = NULL;
- row.refs[i].row = 0;
+ new_row.AddEntry(nullptr, 0, 1);
}
+ new_row.Finalize();
}
// Materializes the current reference table into a target record batch
- Result<std::shared_ptr<RecordBatch>> Materialize(
- MemoryPool* memory_pool, const std::shared_ptr<arrow::Schema>& output_schema,
- const std::vector<std::unique_ptr<InputState>>& state) {
- DCHECK_EQ(state.size(), n_tables_);
-
- // Don't build empty batches
- size_t n_rows = rows_.size();
- if (!n_rows) return NULLPTR;
-
- // Build the arrays column-by-column from the rows
- std::vector<std::shared_ptr<arrow::Array>> arrays(output_schema->num_fields());
- for (size_t i_table = 0; i_table < n_tables_; ++i_table) {
- int n_src_cols = state.at(i_table)->get_schema()->num_fields();
- {
- for (col_index_t i_src_col = 0; i_src_col < n_src_cols; ++i_src_col) {
- std::optional<col_index_t> i_dst_col_opt =
- state[i_table]->MapSrcToDst(i_src_col);
- if (!i_dst_col_opt) continue;
- col_index_t i_dst_col = *i_dst_col_opt;
- const auto& src_field = state[i_table]->get_schema()->field(i_src_col);
- const auto& dst_field = output_schema->field(i_dst_col);
- DCHECK(src_field->type()->Equals(dst_field->type()));
- DCHECK_EQ(src_field->name(), dst_field->name());
- const auto& field_type = src_field->type();
-
-#define ASOFJOIN_MATERIALIZE_CASE(id) \
- case Type::id: { \
- using T = typename TypeIdTraits<Type::id>::Type; \
- ARROW_ASSIGN_OR_RAISE( \
- arrays.at(i_dst_col), \
- MaterializeColumn<T>(memory_pool, field_type, i_table, i_src_col)); \
- break; \
- }
-
- switch (field_type->id()) {
- ASOFJOIN_MATERIALIZE_CASE(BOOL)
- ASOFJOIN_MATERIALIZE_CASE(INT8)
- ASOFJOIN_MATERIALIZE_CASE(INT16)
- ASOFJOIN_MATERIALIZE_CASE(INT32)
- ASOFJOIN_MATERIALIZE_CASE(INT64)
- ASOFJOIN_MATERIALIZE_CASE(UINT8)
- ASOFJOIN_MATERIALIZE_CASE(UINT16)
- ASOFJOIN_MATERIALIZE_CASE(UINT32)
- ASOFJOIN_MATERIALIZE_CASE(UINT64)
- ASOFJOIN_MATERIALIZE_CASE(FLOAT)
- ASOFJOIN_MATERIALIZE_CASE(DOUBLE)
- ASOFJOIN_MATERIALIZE_CASE(DATE32)
- ASOFJOIN_MATERIALIZE_CASE(DATE64)
- ASOFJOIN_MATERIALIZE_CASE(TIME32)
- ASOFJOIN_MATERIALIZE_CASE(TIME64)
- ASOFJOIN_MATERIALIZE_CASE(TIMESTAMP)
- ASOFJOIN_MATERIALIZE_CASE(STRING)
- ASOFJOIN_MATERIALIZE_CASE(LARGE_STRING)
- ASOFJOIN_MATERIALIZE_CASE(BINARY)
- ASOFJOIN_MATERIALIZE_CASE(LARGE_BINARY)
- default:
- return Status::Invalid("Unsupported data type ",
- src_field->type()->ToString(), " for field ",
- src_field->name());
- }
-
-#undef ASOFJOIN_MATERIALIZE_CASE
- }
- }
- }
-
- // Build the result
- DCHECK_LE(n_rows, (uint64_t)std::numeric_limits<int64_t>::max());
- std::shared_ptr<arrow::RecordBatch> r =
- arrow::RecordBatch::Make(output_schema, (int64_t)n_rows, arrays);
- return r;
+ Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+ return unmaterialized_table.Materialize();
}
// Returns true if there are no rows
- bool empty() const { return rows_.empty(); }
+ bool empty() const { return unmaterialized_table.Empty(); }
private:
- // Contains shared_ptr refs for all RecordBatches referred to by the contents of rows_
- std::unordered_map<uintptr_t, std::shared_ptr<RecordBatch>> _ptr2ref;
-
- // Row table references
- std::vector<CompositeReferenceRow<MAX_TABLES>> rows_;
+ CompositeTable unmaterialized_table;
// Total number of tables in the composite table
size_t n_tables_;
@@ -1171,70 +895,20 @@ class CompositeReferenceTable {
AsofJoinNode* node_;
#endif
- // Adds a RecordBatch ref to the mapping, if needed
- void AddRecordBatchRef(const std::shared_ptr<RecordBatch>& ref) {
- if (!_ptr2ref.count((uintptr_t)ref.get())) _ptr2ref[(uintptr_t)ref.get()] = ref;
- }
-
- template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
- enable_if_boolean<Type, Status> static BuilderAppend(
- Builder& builder, const std::shared_ptr<ArrayData>& source, row_index_t row) {
- if (source->IsNull(row)) {
- builder.UnsafeAppendNull();
- return Status::OK();
- }
- builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
- return Status::OK();
- }
-
- template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
- enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
- Status> static BuilderAppend(Builder& builder,
- const std::shared_ptr<ArrayData>& source,
- row_index_t row) {
- if (source->IsNull(row)) {
- builder.UnsafeAppendNull();
- return Status::OK();
- }
- using CType = typename TypeTraits<Type>::CType;
- builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
- return Status::OK();
- }
-
- template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
- enable_if_base_binary<Type, Status> static BuilderAppend(
- Builder& builder, const std::shared_ptr<ArrayData>& source, row_index_t row) {
- if (source->IsNull(row)) {
- return builder.AppendNull();
- }
- using offset_type = typename Type::offset_type;
- const uint8_t* data = source->buffers[2]->data();
- const offset_type* offsets = source->GetValues<offset_type>(1);
- const offset_type offset0 = offsets[row];
- const offset_type offset1 = offsets[row + 1];
- return builder.Append(data + offset0, offset1 - offset0);
- }
-
- template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
- Result<std::shared_ptr<Array>> MaterializeColumn(MemoryPool* memory_pool,
- const std::shared_ptr<DataType>& type,
- size_t i_table, col_index_t i_col) {
- ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool));
- Builder& builder = *checked_cast<Builder*>(a_builder.get());
- ARROW_RETURN_NOT_OK(builder.Reserve(rows_.size()));
- for (row_index_t i_row = 0; i_row < rows_.size(); ++i_row) {
- const auto& ref = rows_[i_row].refs[i_table];
- if (ref.batch) {
- Status st =
- BuilderAppend<Type, Builder>(builder, ref.batch->column_data(i_col), ref.row);
- ARROW_RETURN_NOT_OK(st);
- } else {
- builder.UnsafeAppendNull();
+ static CompositeTable InitUnmaterializedTable(
+ const std::shared_ptr<Schema>& schema,
+ const std::vector<std::unique_ptr<InputState>>& inputs, arrow::MemoryPool* pool) {
+ std::unordered_map<int, std::pair<int, int>> dst_to_src;
+ for (size_t i = 0; i < inputs.size(); i++) {
+ auto& input = inputs[i];
+ for (int src = 0; src < input->get_schema()->num_fields(); src++) {
+ auto dst = input->MapSrcToDst(src);
+ if (dst.has_value()) {
+ dst_to_src[dst.value()] = std::make_pair(static_cast<int>(i), src);
+ }
}
}
- std::shared_ptr<Array> result;
- ARROW_RETURN_NOT_OK(builder.Finish(&result));
- return result;
+ return CompositeTable{schema, inputs.size(), dst_to_src, pool};
}
};
@@ -1279,7 +953,9 @@ class AsofJoinNode : public ExecNode {
auto& lhs = *state_.at(0);
// Construct new target table if needed
- CompositeReferenceTable<MAX_JOIN_TABLES> dst(DEBUG_ADD(state_.size(), this));
+ CompositeTableBuilder<MAX_JOIN_TABLES> dst(state_, output_schema_,
+ plan()->query_context()->memory_pool(),
+ DEBUG_ADD(state_.size(), this));
// Generate rows into the dst table until we either run out of data or hit the row
// limit, or run out of input
@@ -1318,8 +994,8 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
- return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
- state_);
+ ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
+ return out.has_value() ? out.value() : NULLPTR;
}
}
diff --git a/cpp/src/arrow/acero/backpressure_handler.h b/cpp/src/arrow/acero/backpressure_handler.h
new file mode 100644
index 0000000000..178272315d
--- /dev/null
+++ b/cpp/src/arrow/acero/backpressure_handler.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+
+#include <memory>
+
+namespace arrow::acero {
+
+class BackpressureHandler {
+ private:
+ BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
+ std::unique_ptr<BackpressureControl> backpressure_control)
+ : input_(input),
+ low_threshold_(low_threshold),
+ high_threshold_(high_threshold),
+ backpressure_control_(std::move(backpressure_control)) {}
+
+ public:
+ static Result<BackpressureHandler> Make(
+ ExecNode* input, size_t low_threshold, size_t high_threshold,
+ std::unique_ptr<BackpressureControl> backpressure_control) {
+ if (low_threshold >= high_threshold) {
+ return Status::Invalid("low threshold (", low_threshold,
+ ") must be less than high threshold (", high_threshold, ")");
+ }
+ if (backpressure_control == NULLPTR) {
+ return Status::Invalid("null backpressure control parameter");
+ }
+ BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
+ std::move(backpressure_control));
+ return std::move(backpressure_handler);
+ }
+
+ void Handle(size_t start_level, size_t end_level) {
+ if (start_level < high_threshold_ && end_level >= high_threshold_) {
+ backpressure_control_->Pause();
+ } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
+ backpressure_control_->Resume();
+ }
+ }
+
+ Status ForceShutdown() {
+ // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
+ // Since acero's executor won't terminate if any one node is paused, we need to
+ // force resume the node before stopping production.
+ backpressure_control_->Resume();
+ return input_->StopProducing();
+ }
+
+ private:
+ ExecNode* input_;
+ size_t low_threshold_;
+ size_t high_threshold_;
+ std::unique_ptr<BackpressureControl> backpressure_control_;
+};
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/concurrent_queue_internal.h b/cpp/src/arrow/acero/concurrent_queue_internal.h
new file mode 100644
index 0000000000..f530394187
--- /dev/null
+++ b/cpp/src/arrow/acero/concurrent_queue_internal.h
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for a thread safe blocking unbound multi-consumer /
+ * multi-producer concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+ // Pops the last item from the queue. Must be called on a non-empty queue
+ //
+ T Pop() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ cond_.wait(lock, [&] { return !queue_.empty(); });
+ return PopUnlocked();
+ }
+
+ // Pops the last item from the queue, or returns a nullopt if empty
+ //
+ std::optional<T> TryPop() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ return TryPopUnlocked();
+ }
+
+ // Pushes an item to the queue
+ //
+ void Push(const T& item) {
+ std::unique_lock<std::mutex> lock(mutex_);
+ return PushUnlocked(item);
+ }
+
+ // Clears the queue
+ //
+ void Clear() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ ClearUnlocked();
+ }
+
+ bool Empty() const {
+ std::unique_lock<std::mutex> lock(mutex_);
+ return queue_.empty();
+ }
+
+ // Un-synchronized access to front
+ // For this to be "safe":
+ // 1) the caller logically guarantees that queue is not empty
+ // 2) pop/try_pop cannot be called concurrently with this
+ const T& UnsyncFront() const { return queue_.front(); }
+
+ size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+ std::mutex& GetMutex() { return mutex_; }
+
+ T PopUnlocked() {
+ auto item = queue_.front();
+ queue_.pop();
+ return item;
+ }
+
+ void PushUnlocked(const T& item) {
+ queue_.push(item);
+ cond_.notify_one();
+ }
+
+ void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+ std::optional<T> TryPopUnlocked() {
+ // Try to pop the oldest value from the queue (or return nullopt if none)
+ if (queue_.empty()) {
+ return std::nullopt;
+ } else {
+ auto item = queue_.front();
+ queue_.pop();
+ return item;
+ }
+ }
+ std::queue<T> queue_;
+
+ private:
+ mutable std::mutex mutex_;
+ std::condition_variable cond_;
+};
+
+template <typename T>
+class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+ private:
+ struct DoHandle {
+ explicit DoHandle(BackpressureConcurrentQueue& queue)
+ : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+ ~DoHandle() {
+ // unsynced access is safe since DoHandle is internally only used when the
+ // lock is held
+ size_t end_size = queue_.UnsyncSize();
+ queue_.handler_.Handle(start_size_, end_size);
+ }
+
+ BackpressureConcurrentQueue& queue_;
+ size_t start_size_;
+ };
+
+ public:
+ explicit BackpressureConcurrentQueue(BackpressureHandler handler)
+ : handler_(std::move(handler)) {}
+
+ T Pop() {
+ std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+ DoHandle do_handle(*this);
+ return ConcurrentQueue<T>::PopUnlocked();
+ }
+
+ void Push(const T& item) {
+ std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+ DoHandle do_handle(*this);
+ ConcurrentQueue<T>::PushUnlocked(item);
+ }
+
+ void Clear() {
+ std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+ DoHandle do_handle(*this);
+ ConcurrentQueue<T>::ClearUnlocked();
+ }
+
+ std::optional<T> TryPop() {
+ std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+ DoHandle do_handle(*this);
+ return ConcurrentQueue<T>::TryPopUnlocked();
+ }
+
+ Status ForceShutdown() { return handler_.ForceShutdown(); }
+
+ private:
+ BackpressureHandler handler_;
+};
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 541e5fed62..97119726d4 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -1114,6 +1114,7 @@ void RegisterAggregateNode(ExecFactoryRegistry*);
void RegisterSinkNode(ExecFactoryRegistry*);
void RegisterHashJoinNode(ExecFactoryRegistry*);
void RegisterAsofJoinNode(ExecFactoryRegistry*);
+void RegisterSortedMergeNode(ExecFactoryRegistry*);
} // namespace internal
@@ -1132,6 +1133,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
internal::RegisterSinkNode(this);
internal::RegisterHashJoinNode(this);
internal::RegisterAsofJoinNode(this);
+ internal::RegisterSortedMergeNode(this);
}
Result<Factory> GetFactory(const std::string& factory_name) override {
diff --git a/cpp/src/arrow/acero/sorted_merge_node.cc b/cpp/src/arrow/acero/sorted_merge_node.cc
new file mode 100644
index 0000000000..f3b934eda1
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node.cc
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <any>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue_internal.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+ Callable callable;
+ explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+ ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+ const arrow::acero::ExecNode::NodeVector& inputs) {
+ std::vector<std::string> labels(inputs.size());
+ for (size_t i = 0; i < inputs.size(); i++) {
+ labels[i] = "input_" + std::to_string(i) + "_label";
+ }
+ return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+ return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+ return container.end() != std_find(container, val);
+}
+
+} // namespace
+
+namespace arrow::acero {
+
+namespace {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using SingleRecordBatchSliceBuilder = arrow::acero::UnmaterializedSliceBuilder<1>;
+using SingleRecordBatchCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+constexpr bool kNewTask = true;
+constexpr bool kPoisonPill = false;
+
+class BackpressureController : public BackpressureControl {
+ public:
+ BackpressureController(ExecNode* node, ExecNode* output,
+ std::atomic<int32_t>& backpressure_counter)
+ : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+ void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+ void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+ ExecNode* node_;
+ ExecNode* output_;
+ std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+ InputState(size_t index, BackpressureHandler handler,
+ const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+ : index_(index),
+ queue_(std::move(handler)),
+ schema_(schema),
+ time_col_index_(time_col_index),
+ time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+ template <typename PtrType>
+ static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+ arrow::acero::ExecNode* output,
+ std::atomic<int32_t>& backpressure_counter,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const col_index_t time_col_index) {
+ constexpr size_t low_threshold = 4, high_threshold = 8;
+ std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+ std::make_unique<BackpressureController>(input, output, backpressure_counter);
+ ARROW_ASSIGN_OR_RAISE(auto handler,
+ BackpressureHandler::Make(input, low_threshold, high_threshold,
+ std::move(backpressure_control)));
+ return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+ }
+
+ bool IsTimeColumn(col_index_t i) const {
+ DCHECK_LT(i, schema_->num_fields());
+ return (i == time_col_index_);
+ }
+
+ // Gets the latest row index, assuming the queue isn't empty
+ row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+ bool Empty() const {
+ // cannot be empty if ref row is >0 -- can avoid slow queue lock
+ // below
+ if (latest_ref_row_ > 0) {
+ return false;
+ }
+ return queue_.Empty();
+ }
+
+ size_t index() const { return index_; }
+
+ int total_batches() const { return total_batches_; }
+
+ // Gets latest batch (precondition: must not be empty)
+ const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+ return queue_.UnsyncFront();
+ }
+
+#define LATEST_VAL_CASE(id, val) \
+ case arrow::Type::id: { \
+ using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+ using CType = typename arrow::TypeTraits<T>::CType; \
+ return val(data->GetValues<CType>(1)[row]); \
+ }
+
+ inline time_unit_t GetLatestTime() const {
+ return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+ latest_ref_row_);
+ }
+
+#undef LATEST_VAL_CASE
+
+ bool Finished() const { return batches_processed_ == total_batches_; }
+
+ void Advance(SingleRecordBatchSliceBuilder& builder) {
+ // Advance the row until a new time is encountered or the record batch
+ // ends. This will return a range of {-1, -1} and a nullptr if there is
+ // no input
+ bool active =
+ (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+ if (!active) {
+ return;
+ }
+
+ row_index_t start = latest_ref_row_;
+ row_index_t end = latest_ref_row_;
+ time_unit_t startTime = GetLatestTime();
+ std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+ auto rows_in_batch = (row_index_t)batch->num_rows();
+
+ while (GetLatestTime() == startTime) {
+ end = ++latest_ref_row_;
+ if (latest_ref_row_ >= rows_in_batch) {
+ // hit the end of the batch, need to get the next batch if
+ // possible.
+ ++batches_processed_;
+ latest_ref_row_ = 0;
+ active &= !queue_.TryPop();
+ if (active) {
+ DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+ 0); // empty batches disallowed, sanity check
+ }
+ break;
+ }
+ }
+ builder.AddEntry(batch, start, end);
+ }
+
+ arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+ if (rb->num_rows() > 0) {
+ queue_.Push(rb);
+ } else {
+ ++batches_processed_; // don't enqueue empty batches, just record
+ // as processed
+ }
+ return arrow::Status::OK();
+ }
+
+ const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+ void set_total_batches(int n) { total_batches_ = n; }
+
+ private:
+ size_t index_;
+ // Pending record batches. The latest is the front. Batches cannot be empty.
+ BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+ // Schema associated with the input
+ std::shared_ptr<arrow::Schema> schema_;
+ // Total number of batches (only int because InputFinished uses int)
+ std::atomic<int> total_batches_{-1};
+ // Number of batches processed so far (only int because InputFinished uses
+ // int)
+ std::atomic<int> batches_processed_{0};
+ // Index of the time col
+ col_index_t time_col_index_;
+ // Type id of the time column
+ arrow::Type::type time_type_id_;
+ // Index of the latest row reference within; if >0 then queue_ cannot be
+ // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+ row_index_t latest_ref_row_ = 0;
+ // Time of latest row
+ time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+ bool operator()(const std::shared_ptr<InputState>& lhs,
+ const std::shared_ptr<InputState>& rhs) const {
+ // True if lhs is ahead of time of rhs
+ if (lhs->Finished()) {
+ return false;
+ }
+ if (rhs->Finished()) {
+ return false;
+ }
+ time_unit_t lFirst = lhs->GetLatestTime();
+ time_unit_t rFirst = rhs->GetLatestTime();
+ return lFirst > rFirst;
+ }
+};
+
+class SortedMergeNode : public ExecNode {
+ static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+ SortedMergeNode(arrow::acero::ExecPlan* plan,
+ std::vector<arrow::acero::ExecNode*> inputs,
+ std::shared_ptr<arrow::Schema> output_schema,
+ arrow::Ordering new_ordering)
+ : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+ ordering_(std::move(new_ordering)),
+ input_counter(inputs_.size()),
+ output_counter(inputs_.size()),
+ process_thread() {
+ SetLabel("sorted_merge");
+ }
+
+ ~SortedMergeNode() override {
+ process_queue.Push(
+ kPoisonPill); // poison pill
+ // We might create a temporary (such as to inspect the output
+ // schema), in which case there isn't anything to join
+ if (process_thread.joinable()) {
+ process_thread.join();
+ }
+ }
+
+ static arrow::Result<arrow::acero::ExecNode*> Make(
+ arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+ const arrow::acero::ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+ "SortedMergeNode"));
+
+ if (inputs.size() < 1) {
+ return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+ }
+
+ const auto schema = inputs.at(0)->output_schema();
+ for (const auto& input : inputs) {
+ if (!input->output_schema()->Equals(schema)) {
+ return Status::Invalid(
+ "SortedMergeNode input schemas must all "
+ "match, first schema "
+ "was: ",
+ schema->ToString(), " got schema: ", input->output_schema()->ToString());
+ }
+ }
+
+ const auto& order_options =
+ arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+ if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+ return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+ }
+
+ std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+ return plan->EmplaceNode<SortedMergeNode>(
+ plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+ }
+
+ const char* kind_name() const override { return "SortedMergeNode"; }
+
+ const arrow::Ordering& ordering() const override { return ordering_; }
+
+ arrow::Status Init() override {
+ ARROW_CHECK(ordering_.sort_keys().size() == 1) << "Only one sort key supported";
+
+ auto inputs = this->inputs();
+ for (size_t i = 0; i < inputs.size(); i++) {
+ ExecNode* input = inputs[i];
+ const auto& schema = input->output_schema();
+
+ const auto& sort_key = ordering_.sort_keys()[0];
+ if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+ return Status::NotImplemented("Only ascending sort order is supported");
+ }
+
+ const FieldRef& ref = sort_key.target;
+ auto match_res = ref.FindOne(*schema);
+ if (!match_res.ok()) {
+ return Status::Invalid("Bad sort key : ", match_res.status().message());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto match, match_res);
+ ARROW_DCHECK(match.indices().size() == 1);
+
+ ARROW_ASSIGN_OR_RAISE(auto input_state,
+ InputState::Make<std::shared_ptr<InputState>>(
+ i, input, this, backpressure_counter, schema,
+ std::move(match.indices()[0])));
+ state.push_back(std::move(input_state));
+ }
+ return Status::OK();
+ }
+
+ arrow::Status InputReceived(arrow::acero::ExecNode* input,
+ arrow::ExecBatch batch) override {
+ ARROW_DCHECK(std_has(inputs_, input));
+ const size_t index = std_find(inputs_, input) - inputs_.begin();
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> rb,
+ batch.ToRecordBatch(output_schema_));
+
+ // Push into the queue. Note that we don't need to lock since
+ // InputState's ConcurrentQueue manages locking
+ input_counter[index] += rb->num_rows();
+ ARROW_RETURN_NOT_OK(state[index]->Push(rb));
+ process_queue.Push(kNewTask);
+ return Status::OK();
+ }
+
+ arrow::Status InputFinished(arrow::acero::ExecNode* input, int total_batches) override {
+ ARROW_DCHECK(std_has(inputs_, input));
+ {
+ std::lock_guard<std::mutex> guard(gate);
+ ARROW_DCHECK(std_has(inputs_, input));
+ size_t k = std_find(inputs_, input) - inputs_.begin();
+ state.at(k)->set_total_batches(total_batches);
+ }
+ // Trigger a final process call for stragglers
+ process_queue.Push(kNewTask);
+ return Status::OK();
+ }
+
+ arrow::Status StartProducing() override {
+ ARROW_ASSIGN_OR_RAISE(process_task, plan_->query_context()->BeginExternalTask(
+ "SortedMergeNode::ProcessThread"));
+ if (!process_task.is_valid()) {
+ // Plan has already aborted. Do not start process thread
+ return Status::OK();
+ }
+ process_thread = std::thread(&SortedMergeNode::StartPoller, this);
+ return Status::OK();
+ }
+
+ arrow::Status StopProducingImpl() override {
+ process_queue.Clear();
+ process_queue.Push(kPoisonPill);
+ return Status::OK();
+ }
+
+ // handled by the backpressure controller
+ void PauseProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+ void ResumeProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+
+ protected:
+ std::string ToStringExtra(int indent) const override {
+ std::stringstream ss;
+ ss << "ordering=" << ordering_.ToString();
+ return ss.str();
+ }
+
+ private:
+ void EndFromProcessThread(arrow::Status st = arrow::Status::OK()) {
+ ARROW_CHECK(!cleanup_started);
+ for (size_t i = 0; i < input_counter.size(); ++i) {
+ ARROW_CHECK(input_counter[i] == output_counter[i])
+ << input_counter[i] << " != " << output_counter[i];
+ }
+
+ ARROW_UNUSED(
+ plan_->query_context()->executor()->Spawn([this, st = std::move(st)]() mutable {
+ Defer cleanup([this, &st]() { process_task.MarkFinished(st); });
+ if (st.ok()) {
+ st = output_->InputFinished(this, batches_produced);
+ }
+ }));
+ }
+
+ bool CheckEnded() {
+ bool all_finished = true;
+ for (const auto& s : state) {
+ all_finished &= s->Finished();
+ }
+ if (all_finished) {
+ EndFromProcessThread();
+ return false;
+ }
+ return true;
+ }
+
+ /// Streams the input states in sorted order until we run out of input
+ arrow::Result<std::shared_ptr<arrow::RecordBatch>> getNextBatch() {
+ DCHECK(!state.empty());
+ for (const auto& s : state) {
+ if (s->Empty() && !s->Finished()) {
+ return nullptr; // not enough data, wait
+ }
+ }
+
+ std::vector<std::shared_ptr<InputState>> heap = state;
+ // filter out finished states
+ heap.erase(std::remove_if(
+ heap.begin(), heap.end(),
+ [](const std::shared_ptr<InputState>& s) { return s->Finished(); }),
+ heap.end());
+
+ // If any are Empty(), then return early since we don't have enough data
+ if (std::any_of(heap.begin(), heap.end(),
+ [](const std::shared_ptr<InputState>& s) { return s->Empty(); })) {
+ return nullptr;
+ }
+
+ // Currently we only support one sort key
+ const auto sort_col = *ordering_.sort_keys().at(0).target.name();
+ const auto comp = InputStateComparator();
+ std::make_heap(heap.begin(), heap.end(), comp);
+
+ // Each slice only has one record batch with the same schema as the output
+ std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+ for (int i = 0; i < output_schema_->num_fields(); i++) {
+ output_col_to_src[i] = std::make_pair(0, i);
+ }
+ SingleRecordBatchCompositeTable output(output_schema(), 1,
+ std::move(output_col_to_src),
+ plan()->query_context()->memory_pool());
+
+ // Generate rows until we run out of data or we exceed the target output
+ // size
+ bool waiting_for_more_data = false;
+ while (!waiting_for_more_data && !heap.empty() &&
+ output.Size() < kTargetOutputBatchSize) {
+ std::pop_heap(heap.begin(), heap.end(), comp);
+
+ auto& next_item = heap.back();
+ time_unit_t latest_time = std::numeric_limits<time_unit_t>::min();
+ time_unit_t new_time = next_item->GetLatestTime();
+ ARROW_CHECK(new_time >= latest_time)
+ << "Input state " << next_item->index()
+ << " has out of order data. newTime=" << new_time
+ << " latestTime=" << latest_time;
+
+ latest_time = new_time;
+ SingleRecordBatchSliceBuilder builder{&output};
+ next_item->Advance(builder);
+
+ if (builder.Size() > 0) {
+ output_counter[next_item->index()] += builder.Size();
+ builder.Finalize();
+ }
+ if (next_item->Finished()) {
+ heap.pop_back();
+ } else if (next_item->Empty()) {
+ // We've run out of data on one of the inputs
+ waiting_for_more_data = true;
+ continue; // skip the unnecessary make_heap
+ }
+ std::make_heap(heap.begin(), heap.end(), comp);
+ }
+
+ // Emit the batch
+ if (output.Size() == 0) {
+ return nullptr;
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto maybe_rb, output.Materialize());
+ return maybe_rb.value_or(nullptr);
+ }
+ /// Gets a batch. Returns true if there is more data to process, false if we
+ /// are done or an error occurred
+ bool PollOnce() {
+ std::lock_guard<std::mutex> guard(gate);
+ if (!CheckEnded()) {
+ return false;
+ }
+
+ // Process batches while we have data
+ for (;;) {
+ Result<std::shared_ptr<RecordBatch>> result = getNextBatch();
+
+ if (result.ok()) {
+ auto out_rb = *result;
+ if (!out_rb) {
+ break;
+ }
+ ExecBatch out_b(*out_rb);
+ out_b.index = batches_produced++;
+ Status st = output_->InputReceived(this, std::move(out_b));
+ if (!st.ok()) {
+ ARROW_LOG(FATAL) << "Error in output_::InputReceived: " << st.ToString();
+ EndFromProcessThread(std::move(st));
+ }
+ } else {
+ EndFromProcessThread(result.status());
+ return false;
+ }
+ }
+
+ // Report to the output the total batch count, if we've already
+ // finished everything (there are two places where this can happen:
+ // here and InputFinished)
+ //
+ // It may happen here in cases where InputFinished was called before
+ // we were finished producing results (so we didn't know the output
+ // size at that time)
+ if (!CheckEnded()) {
+ return false;
+ }
+
+ // There is no more we can do now but there is still work remaining
+ // for later when more data arrives.
+ return true;
+ }
+
+ void EmitBatches() {
+ while (true) {
+ // Implementation note: If the queue is empty, we will block here
+ if (process_queue.Pop() == kPoisonPill) {
+ EndFromProcessThread();
+ }
+ // Either we're out of data or something went wrong
+ if (!PollOnce()) {
+ return;
+ }
+ }
+ }
+
+ /// The entry point for processThread
+ static void StartPoller(SortedMergeNode* node) { node->EmitBatches(); }
+
+ arrow::Ordering ordering_;
+
+ // Each input state corresponds to an input (e.g. a parquet data file)
+ std::vector<std::shared_ptr<InputState>> state;
+ std::vector<std::atomic_int64_t> input_counter;
+ std::vector<std::atomic_int64_t> output_counter;
+ std::mutex gate;
+
+ std::atomic<bool> cleanup_started{false};
+
+ // Backpressure counter common to all input states
+ std::atomic<int32_t> backpressure_counter;
+
+ std::atomic<int32_t> batches_produced{0};
+
+ // Queue to trigger processing of a given input. False acts as a poison pill
+ ConcurrentQueue<bool> process_queue;
+ // Once StartProducing is called, we initialize this thread to poll the
+ // input states and emit batches
+ std::thread process_thread;
+ arrow::Future<> process_task;
+
+ // Map arg index --> completion counter
+ std::vector<arrow::acero::AtomicCounter> counter_;
+ // Map arg index --> data
+ std::vector<InputState> accumulation_queue_;
+ std::mutex mutex_;
+ std::atomic<int> total_batches_{0};
+};
+
+} // namespace
+
+namespace internal {
+void RegisterSortedMergeNode(ExecFactoryRegistry* registry) {
+ DCHECK_OK(registry->AddFactory("sorted_merge", SortedMergeNode::Make));
+}
+} // namespace internal
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/sorted_merge_node_test.cc b/cpp/src/arrow/acero/sorted_merge_node_test.cc
new file mode 100644
index 0000000000..55446d631d
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node_test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/test_nodes.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/compute/ordering.h"
+#include "arrow/result.h"
+#include "arrow/scalar.h"
+#include "arrow/table.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::acero {
+
+std::shared_ptr<Table> TestTable(int start, int step, int rows_per_batch,
+ int num_batches) {
+ return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)},
+ {"str", gen::Random(utf8())}})
+ ->FailOnError()
+ ->Table(rows_per_batch, num_batches);
+}
+
+TEST(SortedMergeNode, Basic) {
+ auto table1 = TestTable(
+ /*start=*/0,
+ /*step=*/2,
+ /*rows_per_batch=*/2,
+ /*num_batches=*/3);
+ auto table2 = TestTable(
+ /*start=*/1,
+ /*step=*/2,
+ /*rows_per_batch=*/3,
+ /*num_batches=*/2);
+ auto table3 = TestTable(
+ /*start=*/3,
+ /*step=*/3,
+ /*rows_per_batch=*/6,
+ /*num_batches=*/1);
+ std::vector<Declaration::Input> src_decls;
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1)));
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2)));
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3)));
+
+ auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")}));
+
+ Declaration sorted_merge{"sorted_merge", src_decls, ops};
+ // We can't use threads for sorted merging since it relies on
+ // ascending deterministic order of timestamps
+ ASSERT_OK_AND_ASSIGN(auto output,
+ DeclarationToTable(sorted_merge, /*use_threads=*/false));
+ ASSERT_EQ(output->num_rows(), 18);
+
+ ASSERT_OK_AND_ASSIGN(auto expected_ts_builder,
+ MakeBuilder(int32(), default_memory_pool()));
+ for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) {
+ ASSERT_OK(expected_ts_builder->AppendScalar(*MakeScalar(i)));
+ }
+ ASSERT_OK_AND_ASSIGN(auto expected_ts, expected_ts_builder->Finish());
+ auto output_col = output->column(0);
+ ASSERT_OK_AND_ASSIGN(auto output_ts, Concatenate(output_col->chunks()));
+
+ AssertArraysEqual(*expected_ts, *output_ts);
+}
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.cc b/cpp/src/arrow/acero/time_series_util.cc
new file mode 100644
index 0000000000..71133fef47
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/data.h"
+
+#include "arrow/acero/time_series_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+template <typename T, enable_if_t<std::is_integral<T>::value, bool>>
+inline uint64_t NormalizeTime(T t) {
+ uint64_t bias =
+ std::is_signed<T>::value ? static_cast<uint64_t>(1) << (8 * sizeof(T) - 1) : 0;
+ return t < 0 ? static_cast<uint64_t>(t + bias) : static_cast<uint64_t>(t);
+}
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row) {
+#define LATEST_VAL_CASE(id, val) \
+ case Type::id: { \
+ using T = typename TypeIdTraits<Type::id>::Type; \
+ using CType = typename TypeTraits<T>::CType; \
+ return val(data->GetValues<CType>(1)[row]); \
+ }
+
+ auto data = batch->column_data(col);
+ switch (time_type) {
+ LATEST_VAL_CASE(INT8, NormalizeTime)
+ LATEST_VAL_CASE(INT16, NormalizeTime)
+ LATEST_VAL_CASE(INT32, NormalizeTime)
+ LATEST_VAL_CASE(INT64, NormalizeTime)
+ LATEST_VAL_CASE(UINT8, NormalizeTime)
+ LATEST_VAL_CASE(UINT16, NormalizeTime)
+ LATEST_VAL_CASE(UINT32, NormalizeTime)
+ LATEST_VAL_CASE(UINT64, NormalizeTime)
+ LATEST_VAL_CASE(DATE32, NormalizeTime)
+ LATEST_VAL_CASE(DATE64, NormalizeTime)
+ LATEST_VAL_CASE(TIME32, NormalizeTime)
+ LATEST_VAL_CASE(TIME64, NormalizeTime)
+ LATEST_VAL_CASE(TIMESTAMP, NormalizeTime)
+ default:
+ DCHECK(false);
+ return 0; // cannot happen
+ }
+
+#undef LATEST_VAL_CASE
+}
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.h b/cpp/src/arrow/acero/time_series_util.h
new file mode 100644
index 0000000000..97707f43bf
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to unsigned 64-bits while preserving ordering of values
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+uint64_t NormalizeTime(T t);
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row);
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h
new file mode 100644
index 0000000000..05d6c86693
--- /dev/null
+++ b/cpp/src/arrow/acero/unmaterialized_table.h
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+ RecordBatch* batch;
+ uint64_t start;
+ uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows. Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented. Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+ UnmaterializedCompositeTable(
+ const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+ std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+ arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+ : schema(output_schema),
+ num_composite_tables(num_composite_tables),
+ output_col_to_src(std::move(output_col_to_src_)),
+ pool{pool_} {}
+
+ // Shallow wrappers around std::vector for performance
+ inline size_t capacity() { return slices.capacity(); }
+ inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+ inline size_t Size() const { return num_rows; }
+ inline size_t Empty() const { return num_rows == 0; }
+
+ Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+ // Don't build empty batches
+ if (Empty()) {
+ return std::nullopt;
+ }
+ DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+ std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id) \
+ case arrow::Type::id: { \
+ using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+ ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+ break; \
+ }
+
+ // Build the arrays column-by-column from the rows
+ for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+ const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+ const auto& field_type = field->type();
+
+ switch (field_type->id()) {
+ MATERIALIZE_CASE(BOOL)
+ MATERIALIZE_CASE(INT8)
+ MATERIALIZE_CASE(INT16)
+ MATERIALIZE_CASE(INT32)
+ MATERIALIZE_CASE(INT64)
+ MATERIALIZE_CASE(UINT8)
+ MATERIALIZE_CASE(UINT16)
+ MATERIALIZE_CASE(UINT32)
+ MATERIALIZE_CASE(UINT64)
+ MATERIALIZE_CASE(FLOAT)
+ MATERIALIZE_CASE(DOUBLE)
+ MATERIALIZE_CASE(DATE32)
+ MATERIALIZE_CASE(DATE64)
+ MATERIALIZE_CASE(TIME32)
+ MATERIALIZE_CASE(TIME64)
+ MATERIALIZE_CASE(TIMESTAMP)
+ MATERIALIZE_CASE(STRING)
+ MATERIALIZE_CASE(LARGE_STRING)
+ MATERIALIZE_CASE(BINARY)
+ MATERIALIZE_CASE(LARGE_BINARY)
+ default:
+ return arrow::Status::Invalid("Unsupported data type ",
+ field->type()->ToString(), " for field ",
+ field->name());
+ }
+ }
+
+#undef MATERIALIZE_CASE
+
+ std::shared_ptr<arrow::RecordBatch> r =
+ arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+ return r;
+ }
+
+ private:
+ struct UnmaterializedSlice {
+ CompositeEntry components[MAX_COMPOSITE_TABLES];
+ size_t num_components;
+
+ inline int64_t Size() const {
+ if (num_components == 0) {
+ return 0;
+ }
+ return components[0].end - components[0].start;
+ }
+ };
+
+ // Mapping from an output column ID to a source table ID and column ID
+ std::shared_ptr<arrow::Schema> schema;
+ size_t num_composite_tables;
+ std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+ arrow::MemoryPool* pool;
+
+ /// A map from address of a record batch to the record batch. Used to
+ /// maintain the lifetime of the record batch in case it goes out of scope
+ /// by the main exec node thread
+ std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+ std::vector<UnmaterializedSlice> slices;
+
+ size_t num_rows = 0;
+
+ // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+ friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+ void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+ ptr2Ref[(uintptr_t)ref.get()] = ref;
+ }
+ void AddSlice(const UnmaterializedSlice& slice) {
+ slices.push_back(slice);
+ num_rows += slice.Size();
+ }
+
+ template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+ enable_if_boolean<Type, Status> static BuilderAppend(
+ Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+ if (source->IsNull(row)) {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ }
+ builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
+ return Status::OK();
+ }
+
+ template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+ enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
+ Status> static BuilderAppend(Builder& builder,
+ const std::shared_ptr<ArrayData>& source,
+ uint64_t row) {
+ if (source->IsNull(row)) {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ }
+ using CType = typename TypeTraits<Type>::CType;
+ builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
+ return Status::OK();
+ }
+
+ template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+ enable_if_base_binary<Type, Status> static BuilderAppend(
+ Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+ if (source->IsNull(row)) {
+ return builder.AppendNull();
+ }
+ using offset_type = typename Type::offset_type;
+ const uint8_t* data = source->buffers[2]->data();
+ const offset_type* offsets = source->GetValues<offset_type>(1);
+ const offset_type offset0 = offsets[row];
+ const offset_type offset1 = offsets[row + 1];
+ return builder.Append(data + offset0, offset1 - offset0);
+ }
+
+ template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
+ arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
+ const std::shared_ptr<arrow::DataType>& type, int i_col) {
+ ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+ Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
+ ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+ const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+ for (const auto& unmaterialized_slice : slices) {
+ const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+ if (batch) {
+ for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+ arrow::Status st = BuilderAppend<Type, Builder>(
+ builder, batch->column_data(column_index), rowNum);
+ ARROW_RETURN_NOT_OK(st);
+ }
+ } else {
+ for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+ ARROW_RETURN_NOT_OK(builder.AppendNull());
+ }
+ }
+ }
+ std::shared_ptr<arrow::Array> result;
+ ARROW_RETURN_NOT_OK(builder.Finish(&result));
+ return Result{std::move(result)};
+ }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder {
+ public:
+ explicit UnmaterializedSliceBuilder(
+ UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>* table_)
+ : table(table_) {}
+
+ void AddEntry(std::shared_ptr<RecordBatch> rb, uint64_t start, uint64_t end) {
+ if (rb) {
+ table->AddRecordBatchRef(rb);
+ }
+ if (slice.num_components) {
+ size_t last_index = slice.num_components - 1;
+ DCHECK_EQ(slice.components[last_index].end - slice.components[last_index].start,
+ end - start)
+ << "Slices should be the same length. ";
+ }
+ slice.components[slice.num_components++] = CompositeEntry{rb.get(), start, end};
+ }
+
+ void Finalize() { table->AddSlice(slice); }
+ int64_t Size() { return slice.Size(); }
+
+ private:
+ using TUnmaterializedCompositeTable =
+ UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>;
+ using TUnmaterializedSlice =
+ typename TUnmaterializedCompositeTable::UnmaterializedSlice;
+
+ TUnmaterializedCompositeTable* table;
+ TUnmaterializedSlice slice{};
+};
+
+} // namespace arrow::acero