You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/11/06 17:48:42 UTC

(arrow) branch main updated: GH-38381: [C++][Acero] Create a sorted merge node (#38380)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b55d8d5664 GH-38381: [C++][Acero] Create a  sorted merge node (#38380)
b55d8d5664 is described below

commit b55d8d5664480f63c2be75f0e18eeb006b640427
Author: Jeremy Aguilon <je...@gmail.com>
AuthorDate: Mon Nov 6 12:48:35 2023 -0500

    GH-38381: [C++][Acero] Create a  sorted merge node (#38380)
    
    
    
    ### Rationale for this change
    
    This is an implementation of a node that can merge N sorted inputs (only in ascending order for a first pass).
    
    Where possible I have shared components with `asof_join_node.cc`.
    
    Full description/use case is described in https://github.com/apache/arrow/issues/38381
    
    ### What changes are included in this PR?
    
    * Take out relevant guts of asofjoin to stream data top to bottom/consume in a non blocking manner
    * Implement a sorted merger
    
    ### Are these changes tested?
    
    Basic test added. Locally I have tested this on 100+ gigabytes of parquet, sharded across 50+ files. Happy to add a benchmark test on top of the basic test, but submitting now for code feedback.
    
    ### Are there any user-facing changes?
    
    Yes, `sorted_merge` is now an exposed declaration
    
    Lead-authored-by: Jeremy Aguilon <je...@gmail.com>
    Co-authored-by: jeremy <je...@gmail.com>
    Co-authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/acero/CMakeLists.txt              |   6 +-
 cpp/src/arrow/acero/asof_join_node.cc           | 432 +++--------------
 cpp/src/arrow/acero/backpressure_handler.h      |  74 +++
 cpp/src/arrow/acero/concurrent_queue_internal.h | 161 +++++++
 cpp/src/arrow/acero/exec_plan.cc                |   2 +
 cpp/src/arrow/acero/sorted_merge_node.cc        | 609 ++++++++++++++++++++++++
 cpp/src/arrow/acero/sorted_merge_node_test.cc   |  87 ++++
 cpp/src/arrow/acero/time_series_util.cc         |  63 +++
 cpp/src/arrow/acero/time_series_util.h          |  31 ++
 cpp/src/arrow/acero/unmaterialized_table.h      | 271 +++++++++++
 10 files changed, 1357 insertions(+), 379 deletions(-)

diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index 153413b33c..b77d52a23e 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -49,9 +49,11 @@ set(ARROW_ACERO_SRCS
     project_node.cc
     query_context.cc
     sink_node.cc
+    sorted_merge_node.cc
     source_node.cc
     swiss_join.cc
     task_util.cc
+    time_series_util.cc
     tpch_node.cc
     union_node.cc
     util.cc)
@@ -173,11 +175,13 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
 add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
                      test_nodes.cc)
 
-# asof_join_node uses std::thread internally
+# asof_join_node and sorted_merge_node use std::thread internally
 # and doesn't use ThreadPool so it will
 # be broken if threading is turned off
 if(ARROW_ENABLE_THREADING)
   add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
+  add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc
+                       test_nodes.cc)
 endif()
 
 add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index d19d2db299..4a3b6b199c 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -16,6 +16,8 @@
 // under the License.
 
 #include "arrow/acero/asof_join_node.h"
+#include "arrow/acero/backpressure_handler.h"
+#include "arrow/acero/concurrent_queue_internal.h"
 
 #include <atomic>
 #include <condition_variable>
@@ -30,6 +32,7 @@
 
 #include "arrow/acero/exec_plan.h"
 #include "arrow/acero/options.h"
+#include "arrow/acero/unmaterialized_table.h"
 #ifndef NDEBUG
 #include "arrow/acero/options_internal.h"
 #endif
@@ -41,6 +44,7 @@
 #ifndef NDEBUG
 #include "arrow/compute/function_internal.h"
 #endif
+#include "arrow/acero/time_series_util.h"
 #include "arrow/compute/key_hash.h"
 #include "arrow/compute/light_array.h"
 #include "arrow/record_batch.h"
@@ -122,92 +126,12 @@ struct TolType {
 typedef uint64_t row_index_t;
 typedef int col_index_t;
 
-// normalize the value to 64-bits while preserving ordering of values
-template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
-static inline uint64_t time_value(T t) {
-  uint64_t bias = std::is_signed<T>::value ? (uint64_t)1 << (8 * sizeof(T) - 1) : 0;
-  return t < 0 ? static_cast<uint64_t>(t + bias) : static_cast<uint64_t>(t);
-}
-
 // indicates normalization of a key value
 template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
 static inline uint64_t key_value(T t) {
   return static_cast<uint64_t>(t);
 }
 
-/**
- * Simple implementation for an unbound concurrent queue
- */
-template <class T>
-class ConcurrentQueue {
- public:
-  T Pop() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    cond_.wait(lock, [&] { return !queue_.empty(); });
-    return PopUnlocked();
-  }
-
-  T PopUnlocked() {
-    auto item = queue_.front();
-    queue_.pop();
-    return item;
-  }
-
-  void Push(const T& item) {
-    std::unique_lock<std::mutex> lock(mutex_);
-    return PushUnlocked(item);
-  }
-
-  void PushUnlocked(const T& item) {
-    queue_.push(item);
-    cond_.notify_one();
-  }
-
-  void Clear() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    ClearUnlocked();
-  }
-
-  void ClearUnlocked() { queue_ = std::queue<T>(); }
-
-  std::optional<T> TryPop() {
-    std::unique_lock<std::mutex> lock(mutex_);
-    return TryPopUnlocked();
-  }
-
-  std::optional<T> TryPopUnlocked() {
-    // Try to pop the oldest value from the queue (or return nullopt if none)
-    if (queue_.empty()) {
-      return std::nullopt;
-    } else {
-      auto item = queue_.front();
-      queue_.pop();
-      return item;
-    }
-  }
-
-  bool Empty() const {
-    std::unique_lock<std::mutex> lock(mutex_);
-    return queue_.empty();
-  }
-
-  // Un-synchronized access to front
-  // For this to be "safe":
-  // 1) the caller logically guarantees that queue is not empty
-  // 2) pop/try_pop cannot be called concurrently with this
-  const T& UnsyncFront() const { return queue_.front(); }
-
-  size_t UnsyncSize() const { return queue_.size(); }
-
- protected:
-  std::mutex& GetMutex() { return mutex_; }
-
- private:
-  std::queue<T> queue_;
-  mutable std::mutex mutex_;
-  std::condition_variable cond_;
-};
-
 class AsofJoinNode;
 
 #ifndef NDEBUG
@@ -547,104 +471,6 @@ class BackpressureController : public BackpressureControl {
   std::atomic<int32_t>& backpressure_counter_;
 };
 
-class BackpressureHandler {
- private:
-  BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
-                      std::unique_ptr<BackpressureControl> backpressure_control)
-      : input_(input),
-        low_threshold_(low_threshold),
-        high_threshold_(high_threshold),
-        backpressure_control_(std::move(backpressure_control)) {}
-
- public:
-  static Result<BackpressureHandler> Make(
-      ExecNode* input, size_t low_threshold, size_t high_threshold,
-      std::unique_ptr<BackpressureControl> backpressure_control) {
-    if (low_threshold >= high_threshold) {
-      return Status::Invalid("low threshold (", low_threshold,
-                             ") must be less than high threshold (", high_threshold, ")");
-    }
-    if (backpressure_control == NULLPTR) {
-      return Status::Invalid("null backpressure control parameter");
-    }
-    BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
-                                             std::move(backpressure_control));
-    return std::move(backpressure_handler);
-  }
-
-  void Handle(size_t start_level, size_t end_level) {
-    if (start_level < high_threshold_ && end_level >= high_threshold_) {
-      backpressure_control_->Pause();
-    } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
-      backpressure_control_->Resume();
-    }
-  }
-
-  Status ForceShutdown() {
-    // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
-    // Since acero's executor won't terminate if any one node is paused, we need to
-    // force resume the node before stopping production.
-    backpressure_control_->Resume();
-    return input_->StopProducing();
-  }
-
- private:
-  ExecNode* input_;
-  size_t low_threshold_;
-  size_t high_threshold_;
-  std::unique_ptr<BackpressureControl> backpressure_control_;
-};
-
-template <typename T>
-class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
- private:
-  struct DoHandle {
-    explicit DoHandle(BackpressureConcurrentQueue& queue)
-        : queue_(queue), start_size_(queue_.UnsyncSize()) {}
-
-    ~DoHandle() {
-      size_t end_size = queue_.UnsyncSize();
-      queue_.handler_.Handle(start_size_, end_size);
-    }
-
-    BackpressureConcurrentQueue& queue_;
-    size_t start_size_;
-  };
-
- public:
-  explicit BackpressureConcurrentQueue(BackpressureHandler handler)
-      : handler_(std::move(handler)) {}
-
-  T Pop() {
-    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
-    DoHandle do_handle(*this);
-    return ConcurrentQueue<T>::PopUnlocked();
-  }
-
-  void Push(const T& item) {
-    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
-    DoHandle do_handle(*this);
-    ConcurrentQueue<T>::PushUnlocked(item);
-  }
-
-  void Clear() {
-    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
-    DoHandle do_handle(*this);
-    ConcurrentQueue<T>::ClearUnlocked();
-  }
-
-  std::optional<T> TryPop() {
-    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
-    DoHandle do_handle(*this);
-    return ConcurrentQueue<T>::TryPopUnlocked();
-  }
-
-  Status ForceShutdown() { return handler_.ForceShutdown(); }
-
- private:
-  BackpressureHandler handler_;
-};
-
 class InputState {
   // InputState correponds to an input
   // Input record batches are queued up in InputState until processed and
@@ -783,29 +609,8 @@ class InputState {
   }
 
   inline OnType GetLatestTime() const {
-    return GetTime(GetLatestBatch().get(), latest_ref_row_);
-  }
-
-  inline ByType GetTime(const RecordBatch* batch, row_index_t row) const {
-    auto data = batch->column_data(time_col_index_);
-    switch (time_type_id_) {
-      LATEST_VAL_CASE(INT8, time_value)
-      LATEST_VAL_CASE(INT16, time_value)
-      LATEST_VAL_CASE(INT32, time_value)
-      LATEST_VAL_CASE(INT64, time_value)
-      LATEST_VAL_CASE(UINT8, time_value)
-      LATEST_VAL_CASE(UINT16, time_value)
-      LATEST_VAL_CASE(UINT32, time_value)
-      LATEST_VAL_CASE(UINT64, time_value)
-      LATEST_VAL_CASE(DATE32, time_value)
-      LATEST_VAL_CASE(DATE64, time_value)
-      LATEST_VAL_CASE(TIME32, time_value)
-      LATEST_VAL_CASE(TIME64, time_value)
-      LATEST_VAL_CASE(TIMESTAMP, time_value)
-      default:
-        DCHECK(false);
-        return 0;  // cannot happen
-    }
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
   }
 
 #undef LATEST_VAL_CASE
@@ -832,7 +637,9 @@ class InputState {
         have_active_batch &= !queue_.TryPop();
         if (have_active_batch) {
           DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0);  // empty batches disallowed
-          memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0));  // time changed
+          memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), time_type_id_,
+                                   time_col_index_,
+                                   0));  // time changed
         }
       }
     }
@@ -988,35 +795,25 @@ class InputState {
   std::vector<std::optional<col_index_t>> src_to_dst_;
 };
 
+/// Wrapper around UnmaterializedCompositeTable that knows how to emplace
+/// the join row-by-row
 template <size_t MAX_TABLES>
-struct CompositeReferenceRow {
-  struct Entry {
-    arrow::RecordBatch* batch;  // can be NULL if there's no value
-    row_index_t row;
-  };
-  Entry refs[MAX_TABLES];
-};
+class CompositeTableBuilder {
+  using SliceBuilder = UnmaterializedSliceBuilder<MAX_TABLES>;
+  using CompositeTable = UnmaterializedCompositeTable<MAX_TABLES>;
 
-// A table of composite reference rows.  Rows maintain pointers to the
-// constituent record batches, but the overall table retains shared_ptr
-// references to ensure memory remains resident while the table is live.
-//
-// The main reason for this is that, especially for wide tables, joins
-// are effectively row-oriented, rather than column-oriented.  Separating
-// the join part from the columnar materialization part simplifies the
-// logic around data types and increases efficiency.
-//
-// We don't put the shared_ptr's into the rows for efficiency reasons.
-template <size_t MAX_TABLES>
-class CompositeReferenceTable {
  public:
-  NDEBUG_EXPLICIT CompositeReferenceTable(DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
-      : DEBUG_ADD(n_tables_(n_tables), node_(node)) {
+  NDEBUG_EXPLICIT CompositeTableBuilder(
+      const std::vector<std::unique_ptr<InputState>>& inputs,
+      const std::shared_ptr<Schema>& schema, arrow::MemoryPool* pool,
+      DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
+      : unmaterialized_table(InitUnmaterializedTable(schema, inputs, pool)),
+        DEBUG_ADD(n_tables_(n_tables), node_(node)) {
     DCHECK_GE(n_tables_, 1);
     DCHECK_LE(n_tables_, MAX_TABLES);
   }
 
-  size_t n_rows() const { return rows_.size(); }
+  size_t n_rows() const { return unmaterialized_table.Size(); }
 
   // Adds the latest row from the input state as a new composite reference row
   // - LHS must have a valid key,timestep,and latest rows
@@ -1037,14 +834,16 @@ class CompositeReferenceTable {
       // On the first row of the batch, we resize the destination.
       // The destination size is dictated by the size of the LHS batch.
       row_index_t new_batch_size = lhs_latest_batch->num_rows();
-      row_index_t new_capacity = rows_.size() + new_batch_size;
-      if (rows_.capacity() < new_capacity) rows_.reserve(new_capacity);
+      row_index_t new_capacity = unmaterialized_table.Size() + new_batch_size;
+      if (unmaterialized_table.capacity() < new_capacity) {
+        unmaterialized_table.reserve(new_capacity);
+      }
     }
-    rows_.resize(rows_.size() + 1);
-    auto& row = rows_.back();
-    row.refs[0].batch = lhs_latest_batch.get();
-    row.refs[0].row = lhs_latest_row;
-    AddRecordBatchRef(lhs_latest_batch);
+
+    SliceBuilder new_row{&unmaterialized_table};
+
+    // Each item represents a portion of the columns of the output table
+    new_row.AddEntry(lhs_latest_batch, lhs_latest_row, lhs_latest_row + 1);
 
     DEBUG_SYNC(node_, "Emplace: key=", key, " lhs_latest_row=", lhs_latest_row,
                " lhs_latest_time=", lhs_latest_time, DEBUG_MANIP(std::endl));
@@ -1068,100 +867,25 @@ class CompositeReferenceTable {
         if (tolerance.Accepts(lhs_latest_time, (*opt_entry)->time)) {
           // Have a valid entry
           const MemoStore::Entry* entry = *opt_entry;
-          row.refs[i].batch = entry->batch.get();
-          row.refs[i].row = entry->row;
-          AddRecordBatchRef(entry->batch);
+          new_row.AddEntry(entry->batch, entry->row, entry->row + 1);
           continue;
         }
       }
-      row.refs[i].batch = NULL;
-      row.refs[i].row = 0;
+      new_row.AddEntry(nullptr, 0, 1);
     }
+    new_row.Finalize();
   }
 
   // Materializes the current reference table into a target record batch
-  Result<std::shared_ptr<RecordBatch>> Materialize(
-      MemoryPool* memory_pool, const std::shared_ptr<arrow::Schema>& output_schema,
-      const std::vector<std::unique_ptr<InputState>>& state) {
-    DCHECK_EQ(state.size(), n_tables_);
-
-    // Don't build empty batches
-    size_t n_rows = rows_.size();
-    if (!n_rows) return NULLPTR;
-
-    // Build the arrays column-by-column from the rows
-    std::vector<std::shared_ptr<arrow::Array>> arrays(output_schema->num_fields());
-    for (size_t i_table = 0; i_table < n_tables_; ++i_table) {
-      int n_src_cols = state.at(i_table)->get_schema()->num_fields();
-      {
-        for (col_index_t i_src_col = 0; i_src_col < n_src_cols; ++i_src_col) {
-          std::optional<col_index_t> i_dst_col_opt =
-              state[i_table]->MapSrcToDst(i_src_col);
-          if (!i_dst_col_opt) continue;
-          col_index_t i_dst_col = *i_dst_col_opt;
-          const auto& src_field = state[i_table]->get_schema()->field(i_src_col);
-          const auto& dst_field = output_schema->field(i_dst_col);
-          DCHECK(src_field->type()->Equals(dst_field->type()));
-          DCHECK_EQ(src_field->name(), dst_field->name());
-          const auto& field_type = src_field->type();
-
-#define ASOFJOIN_MATERIALIZE_CASE(id)                                       \
-  case Type::id: {                                                          \
-    using T = typename TypeIdTraits<Type::id>::Type;                        \
-    ARROW_ASSIGN_OR_RAISE(                                                  \
-        arrays.at(i_dst_col),                                               \
-        MaterializeColumn<T>(memory_pool, field_type, i_table, i_src_col)); \
-    break;                                                                  \
-  }
-
-          switch (field_type->id()) {
-            ASOFJOIN_MATERIALIZE_CASE(BOOL)
-            ASOFJOIN_MATERIALIZE_CASE(INT8)
-            ASOFJOIN_MATERIALIZE_CASE(INT16)
-            ASOFJOIN_MATERIALIZE_CASE(INT32)
-            ASOFJOIN_MATERIALIZE_CASE(INT64)
-            ASOFJOIN_MATERIALIZE_CASE(UINT8)
-            ASOFJOIN_MATERIALIZE_CASE(UINT16)
-            ASOFJOIN_MATERIALIZE_CASE(UINT32)
-            ASOFJOIN_MATERIALIZE_CASE(UINT64)
-            ASOFJOIN_MATERIALIZE_CASE(FLOAT)
-            ASOFJOIN_MATERIALIZE_CASE(DOUBLE)
-            ASOFJOIN_MATERIALIZE_CASE(DATE32)
-            ASOFJOIN_MATERIALIZE_CASE(DATE64)
-            ASOFJOIN_MATERIALIZE_CASE(TIME32)
-            ASOFJOIN_MATERIALIZE_CASE(TIME64)
-            ASOFJOIN_MATERIALIZE_CASE(TIMESTAMP)
-            ASOFJOIN_MATERIALIZE_CASE(STRING)
-            ASOFJOIN_MATERIALIZE_CASE(LARGE_STRING)
-            ASOFJOIN_MATERIALIZE_CASE(BINARY)
-            ASOFJOIN_MATERIALIZE_CASE(LARGE_BINARY)
-            default:
-              return Status::Invalid("Unsupported data type ",
-                                     src_field->type()->ToString(), " for field ",
-                                     src_field->name());
-          }
-
-#undef ASOFJOIN_MATERIALIZE_CASE
-        }
-      }
-    }
-
-    // Build the result
-    DCHECK_LE(n_rows, (uint64_t)std::numeric_limits<int64_t>::max());
-    std::shared_ptr<arrow::RecordBatch> r =
-        arrow::RecordBatch::Make(output_schema, (int64_t)n_rows, arrays);
-    return r;
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    return unmaterialized_table.Materialize();
   }
 
   // Returns true if there are no rows
-  bool empty() const { return rows_.empty(); }
+  bool empty() const { return unmaterialized_table.Empty(); }
 
  private:
-  // Contains shared_ptr refs for all RecordBatches referred to by the contents of rows_
-  std::unordered_map<uintptr_t, std::shared_ptr<RecordBatch>> _ptr2ref;
-
-  // Row table references
-  std::vector<CompositeReferenceRow<MAX_TABLES>> rows_;
+  CompositeTable unmaterialized_table;
 
   // Total number of tables in the composite table
   size_t n_tables_;
@@ -1171,70 +895,20 @@ class CompositeReferenceTable {
   AsofJoinNode* node_;
 #endif
 
-  // Adds a RecordBatch ref to the mapping, if needed
-  void AddRecordBatchRef(const std::shared_ptr<RecordBatch>& ref) {
-    if (!_ptr2ref.count((uintptr_t)ref.get())) _ptr2ref[(uintptr_t)ref.get()] = ref;
-  }
-
-  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
-  enable_if_boolean<Type, Status> static BuilderAppend(
-      Builder& builder, const std::shared_ptr<ArrayData>& source, row_index_t row) {
-    if (source->IsNull(row)) {
-      builder.UnsafeAppendNull();
-      return Status::OK();
-    }
-    builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
-    return Status::OK();
-  }
-
-  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
-  enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
-              Status> static BuilderAppend(Builder& builder,
-                                           const std::shared_ptr<ArrayData>& source,
-                                           row_index_t row) {
-    if (source->IsNull(row)) {
-      builder.UnsafeAppendNull();
-      return Status::OK();
-    }
-    using CType = typename TypeTraits<Type>::CType;
-    builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
-    return Status::OK();
-  }
-
-  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
-  enable_if_base_binary<Type, Status> static BuilderAppend(
-      Builder& builder, const std::shared_ptr<ArrayData>& source, row_index_t row) {
-    if (source->IsNull(row)) {
-      return builder.AppendNull();
-    }
-    using offset_type = typename Type::offset_type;
-    const uint8_t* data = source->buffers[2]->data();
-    const offset_type* offsets = source->GetValues<offset_type>(1);
-    const offset_type offset0 = offsets[row];
-    const offset_type offset1 = offsets[row + 1];
-    return builder.Append(data + offset0, offset1 - offset0);
-  }
-
-  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
-  Result<std::shared_ptr<Array>> MaterializeColumn(MemoryPool* memory_pool,
-                                                   const std::shared_ptr<DataType>& type,
-                                                   size_t i_table, col_index_t i_col) {
-    ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool));
-    Builder& builder = *checked_cast<Builder*>(a_builder.get());
-    ARROW_RETURN_NOT_OK(builder.Reserve(rows_.size()));
-    for (row_index_t i_row = 0; i_row < rows_.size(); ++i_row) {
-      const auto& ref = rows_[i_row].refs[i_table];
-      if (ref.batch) {
-        Status st =
-            BuilderAppend<Type, Builder>(builder, ref.batch->column_data(i_col), ref.row);
-        ARROW_RETURN_NOT_OK(st);
-      } else {
-        builder.UnsafeAppendNull();
+  static CompositeTable InitUnmaterializedTable(
+      const std::shared_ptr<Schema>& schema,
+      const std::vector<std::unique_ptr<InputState>>& inputs, arrow::MemoryPool* pool) {
+    std::unordered_map<int, std::pair<int, int>> dst_to_src;
+    for (size_t i = 0; i < inputs.size(); i++) {
+      auto& input = inputs[i];
+      for (int src = 0; src < input->get_schema()->num_fields(); src++) {
+        auto dst = input->MapSrcToDst(src);
+        if (dst.has_value()) {
+          dst_to_src[dst.value()] = std::make_pair(static_cast<int>(i), src);
+        }
       }
     }
-    std::shared_ptr<Array> result;
-    ARROW_RETURN_NOT_OK(builder.Finish(&result));
-    return result;
+    return CompositeTable{schema, inputs.size(), dst_to_src, pool};
   }
 };
 
@@ -1279,7 +953,9 @@ class AsofJoinNode : public ExecNode {
     auto& lhs = *state_.at(0);
 
     // Construct new target table if needed
-    CompositeReferenceTable<MAX_JOIN_TABLES> dst(DEBUG_ADD(state_.size(), this));
+    CompositeTableBuilder<MAX_JOIN_TABLES> dst(state_, output_schema_,
+                                               plan()->query_context()->memory_pool(),
+                                               DEBUG_ADD(state_.size(), this));
 
     // Generate rows into the dst table until we either run out of data or hit the row
     // limit, or run out of input
@@ -1318,8 +994,8 @@ class AsofJoinNode : public ExecNode {
     if (dst.empty()) {
       return NULLPTR;
     } else {
-      return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
-                             state_);
+      ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
+      return out.has_value() ? out.value() : NULLPTR;
     }
   }
 
diff --git a/cpp/src/arrow/acero/backpressure_handler.h b/cpp/src/arrow/acero/backpressure_handler.h
new file mode 100644
index 0000000000..178272315d
--- /dev/null
+++ b/cpp/src/arrow/acero/backpressure_handler.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+
+#include <memory>
+
+namespace arrow::acero {
+
+class BackpressureHandler {
+ private:
+  BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
+                      std::unique_ptr<BackpressureControl> backpressure_control)
+      : input_(input),
+        low_threshold_(low_threshold),
+        high_threshold_(high_threshold),
+        backpressure_control_(std::move(backpressure_control)) {}
+
+ public:
+  static Result<BackpressureHandler> Make(
+      ExecNode* input, size_t low_threshold, size_t high_threshold,
+      std::unique_ptr<BackpressureControl> backpressure_control) {
+    if (low_threshold >= high_threshold) {
+      return Status::Invalid("low threshold (", low_threshold,
+                             ") must be less than high threshold (", high_threshold, ")");
+    }
+    if (backpressure_control == NULLPTR) {
+      return Status::Invalid("null backpressure control parameter");
+    }
+    BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
+                                             std::move(backpressure_control));
+    return std::move(backpressure_handler);
+  }
+
+  void Handle(size_t start_level, size_t end_level) {
+    if (start_level < high_threshold_ && end_level >= high_threshold_) {
+      backpressure_control_->Pause();
+    } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
+      backpressure_control_->Resume();
+    }
+  }
+
+  Status ForceShutdown() {
+    // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
+    // Since acero's executor won't terminate if any one node is paused, we need to
+    // force resume the node before stopping production.
+    backpressure_control_->Resume();
+    return input_->StopProducing();
+  }
+
+ private:
+  ExecNode* input_;
+  size_t low_threshold_;
+  size_t high_threshold_;
+  std::unique_ptr<BackpressureControl> backpressure_control_;
+};
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/concurrent_queue_internal.h b/cpp/src/arrow/acero/concurrent_queue_internal.h
new file mode 100644
index 0000000000..f530394187
--- /dev/null
+++ b/cpp/src/arrow/acero/concurrent_queue_internal.h
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for a thread safe blocking unbound multi-consumer /
+ * multi-producer concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  // Pops the last item from the queue. Must be called on a non-empty queue
+  //
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  // Pops the last item from the queue, or returns a nullopt if empty
+  //
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  // Pushes an item to the queue
+  //
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  // Clears the queue
+  //
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+  std::mutex& GetMutex() { return mutex_; }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+  std::queue<T> queue_;
+
+ private:
+  mutable std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+template <typename T>
+class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+ private:
+  struct DoHandle {
+    explicit DoHandle(BackpressureConcurrentQueue& queue)
+        : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+    ~DoHandle() {
+      // unsynced access is safe since DoHandle is internally only used when the
+      // lock is held
+      size_t end_size = queue_.UnsyncSize();
+      queue_.handler_.Handle(start_size_, end_size);
+    }
+
+    BackpressureConcurrentQueue& queue_;
+    size_t start_size_;
+  };
+
+ public:
+  explicit BackpressureConcurrentQueue(BackpressureHandler handler)
+      : handler_(std::move(handler)) {}
+
+  T Pop() {
+    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+    DoHandle do_handle(*this);
+    return ConcurrentQueue<T>::PopUnlocked();
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+    DoHandle do_handle(*this);
+    ConcurrentQueue<T>::PushUnlocked(item);
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+    DoHandle do_handle(*this);
+    ConcurrentQueue<T>::ClearUnlocked();
+  }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
+    DoHandle do_handle(*this);
+    return ConcurrentQueue<T>::TryPopUnlocked();
+  }
+
+  Status ForceShutdown() { return handler_.ForceShutdown(); }
+
+ private:
+  BackpressureHandler handler_;
+};
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 541e5fed62..97119726d4 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -1114,6 +1114,7 @@ void RegisterAggregateNode(ExecFactoryRegistry*);
 void RegisterSinkNode(ExecFactoryRegistry*);
 void RegisterHashJoinNode(ExecFactoryRegistry*);
 void RegisterAsofJoinNode(ExecFactoryRegistry*);
+void RegisterSortedMergeNode(ExecFactoryRegistry*);
 
 }  // namespace internal
 
@@ -1132,6 +1133,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
       internal::RegisterSinkNode(this);
       internal::RegisterHashJoinNode(this);
       internal::RegisterAsofJoinNode(this);
+      internal::RegisterSortedMergeNode(this);
     }
 
     Result<Factory> GetFactory(const std::string& factory_name) override {
diff --git a/cpp/src/arrow/acero/sorted_merge_node.cc b/cpp/src/arrow/acero/sorted_merge_node.cc
new file mode 100644
index 0000000000..f3b934eda1
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node.cc
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <any>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue_internal.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using SingleRecordBatchSliceBuilder = arrow::acero::UnmaterializedSliceBuilder<1>;
+using SingleRecordBatchCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+constexpr bool kNewTask = true;
+constexpr bool kPoisonPill = false;
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  void Advance(SingleRecordBatchSliceBuilder& builder) {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input
+    bool active =
+        (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+    if (!active) {
+      return;
+    }
+
+    row_index_t start = latest_ref_row_;
+    row_index_t end = latest_ref_row_;
+    time_unit_t startTime = GetLatestTime();
+    std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+    auto rows_in_batch = (row_index_t)batch->num_rows();
+
+    while (GetLatestTime() == startTime) {
+      end = ++latest_ref_row_;
+      if (latest_ref_row_ >= rows_in_batch) {
+        // hit the end of the batch, need to get the next batch if
+        // possible.
+        ++batches_processed_;
+        latest_ref_row_ = 0;
+        active &= !queue_.TryPop();
+        if (active) {
+          DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+                    0);  // empty batches disallowed, sanity check
+        }
+        break;
+      }
+    }
+    builder.AddEntry(batch, start, end);
+  }
+
+  arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+    if (rb->num_rows() > 0) {
+      queue_.Push(rb);
+    } else {
+      ++batches_processed_;  // don't enqueue empty batches, just record
+                             // as processed
+    }
+    return arrow::Status::OK();
+  }
+
+  const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+  void set_total_batches(int n) { total_batches_ = n; }
+
+ private:
+  size_t index_;
+  // Pending record batches. The latest is the front. Batches cannot be empty.
+  BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+  // Schema associated with the input
+  std::shared_ptr<arrow::Schema> schema_;
+  // Total number of batches (only int because InputFinished uses int)
+  std::atomic<int> total_batches_{-1};
+  // Number of batches processed so far (only int because InputFinished uses
+  // int)
+  std::atomic<int> batches_processed_{0};
+  // Index of the time col
+  col_index_t time_col_index_;
+  // Type id of the time column
+  arrow::Type::type time_type_id_;
+  // Index of the latest row reference within; if >0 then queue_ cannot be
+  // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+  row_index_t latest_ref_row_ = 0;
+  // Time of latest row
+  time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+  bool operator()(const std::shared_ptr<InputState>& lhs,
+                  const std::shared_ptr<InputState>& rhs) const {
+    // True if lhs is ahead of time of rhs
+    if (lhs->Finished()) {
+      return false;
+    }
+    if (rhs->Finished()) {
+      return false;
+    }
+    time_unit_t lFirst = lhs->GetLatestTime();
+    time_unit_t rFirst = rhs->GetLatestTime();
+    return lFirst > rFirst;
+  }
+};
+
+class SortedMergeNode : public ExecNode {
+  static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+  SortedMergeNode(arrow::acero::ExecPlan* plan,
+                  std::vector<arrow::acero::ExecNode*> inputs,
+                  std::shared_ptr<arrow::Schema> output_schema,
+                  arrow::Ordering new_ordering)
+      : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+        ordering_(std::move(new_ordering)),
+        input_counter(inputs_.size()),
+        output_counter(inputs_.size()),
+        process_thread() {
+    SetLabel("sorted_merge");
+  }
+
+  ~SortedMergeNode() override {
+    process_queue.Push(
+        kPoisonPill);  // poison pill
+                       // We might create a temporary (such as to inspect the output
+                       // schema), in which case there isn't anything  to join
+    if (process_thread.joinable()) {
+      process_thread.join();
+    }
+  }
+
+  static arrow::Result<arrow::acero::ExecNode*> Make(
+      arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+      const arrow::acero::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+                                         "SortedMergeNode"));
+
+    if (inputs.size() < 1) {
+      return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+    }
+
+    const auto schema = inputs.at(0)->output_schema();
+    for (const auto& input : inputs) {
+      if (!input->output_schema()->Equals(schema)) {
+        return Status::Invalid(
+            "SortedMergeNode input schemas must all "
+            "match, first schema "
+            "was: ",
+            schema->ToString(), " got schema: ", input->output_schema()->ToString());
+      }
+    }
+
+    const auto& order_options =
+        arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+    if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+      return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+    }
+
+    std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<SortedMergeNode>(
+        plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+  }
+
+  const char* kind_name() const override { return "SortedMergeNode"; }
+
+  const arrow::Ordering& ordering() const override { return ordering_; }
+
+  arrow::Status Init() override {
+    ARROW_CHECK(ordering_.sort_keys().size() == 1) << "Only one sort key supported";
+
+    auto inputs = this->inputs();
+    for (size_t i = 0; i < inputs.size(); i++) {
+      ExecNode* input = inputs[i];
+      const auto& schema = input->output_schema();
+
+      const auto& sort_key = ordering_.sort_keys()[0];
+      if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+        return Status::NotImplemented("Only ascending sort order is supported");
+      }
+
+      const FieldRef& ref = sort_key.target;
+      auto match_res = ref.FindOne(*schema);
+      if (!match_res.ok()) {
+        return Status::Invalid("Bad sort key : ", match_res.status().message());
+      }
+      ARROW_ASSIGN_OR_RAISE(auto match, match_res);
+      ARROW_DCHECK(match.indices().size() == 1);
+
+      ARROW_ASSIGN_OR_RAISE(auto input_state,
+                            InputState::Make<std::shared_ptr<InputState>>(
+                                i, input, this, backpressure_counter, schema,
+                                std::move(match.indices()[0])));
+      state.push_back(std::move(input_state));
+    }
+    return Status::OK();
+  }
+
+  arrow::Status InputReceived(arrow::acero::ExecNode* input,
+                              arrow::ExecBatch batch) override {
+    ARROW_DCHECK(std_has(inputs_, input));
+    const size_t index = std_find(inputs_, input) - inputs_.begin();
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> rb,
+                          batch.ToRecordBatch(output_schema_));
+
+    // Push into the queue. Note that we don't need to lock since
+    // InputState's ConcurrentQueue manages locking
+    input_counter[index] += rb->num_rows();
+    ARROW_RETURN_NOT_OK(state[index]->Push(rb));
+    process_queue.Push(kNewTask);
+    return Status::OK();
+  }
+
+  arrow::Status InputFinished(arrow::acero::ExecNode* input, int total_batches) override {
+    ARROW_DCHECK(std_has(inputs_, input));
+    {
+      std::lock_guard<std::mutex> guard(gate);
+      ARROW_DCHECK(std_has(inputs_, input));
+      size_t k = std_find(inputs_, input) - inputs_.begin();
+      state.at(k)->set_total_batches(total_batches);
+    }
+    // Trigger a final process call for stragglers
+    process_queue.Push(kNewTask);
+    return Status::OK();
+  }
+
+  arrow::Status StartProducing() override {
+    ARROW_ASSIGN_OR_RAISE(process_task, plan_->query_context()->BeginExternalTask(
+                                            "SortedMergeNode::ProcessThread"));
+    if (!process_task.is_valid()) {
+      // Plan has already aborted.  Do not start process thread
+      return Status::OK();
+    }
+    process_thread = std::thread(&SortedMergeNode::StartPoller, this);
+    return Status::OK();
+  }
+
+  arrow::Status StopProducingImpl() override {
+    process_queue.Clear();
+    process_queue.Push(kPoisonPill);
+    return Status::OK();
+  }
+
+  // handled by the backpressure controller
+  void PauseProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+  void ResumeProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+
+ protected:
+  std::string ToStringExtra(int indent) const override {
+    std::stringstream ss;
+    ss << "ordering=" << ordering_.ToString();
+    return ss.str();
+  }
+
+ private:
+  void EndFromProcessThread(arrow::Status st = arrow::Status::OK()) {
+    ARROW_CHECK(!cleanup_started);
+    for (size_t i = 0; i < input_counter.size(); ++i) {
+      ARROW_CHECK(input_counter[i] == output_counter[i])
+          << input_counter[i] << " != " << output_counter[i];
+    }
+
+    ARROW_UNUSED(
+        plan_->query_context()->executor()->Spawn([this, st = std::move(st)]() mutable {
+          Defer cleanup([this, &st]() { process_task.MarkFinished(st); });
+          if (st.ok()) {
+            st = output_->InputFinished(this, batches_produced);
+          }
+        }));
+  }
+
+  bool CheckEnded() {
+    bool all_finished = true;
+    for (const auto& s : state) {
+      all_finished &= s->Finished();
+    }
+    if (all_finished) {
+      EndFromProcessThread();
+      return false;
+    }
+    return true;
+  }
+
+  /// Streams the input states in sorted order until we run out of input
+  arrow::Result<std::shared_ptr<arrow::RecordBatch>> getNextBatch() {
+    DCHECK(!state.empty());
+    for (const auto& s : state) {
+      if (s->Empty() && !s->Finished()) {
+        return nullptr;  // not enough data, wait
+      }
+    }
+
+    std::vector<std::shared_ptr<InputState>> heap = state;
+    // filter out finished states
+    heap.erase(std::remove_if(
+                   heap.begin(), heap.end(),
+                   [](const std::shared_ptr<InputState>& s) { return s->Finished(); }),
+               heap.end());
+
+    // If any are Empty(), then return early since we don't have enough data
+    if (std::any_of(heap.begin(), heap.end(),
+                    [](const std::shared_ptr<InputState>& s) { return s->Empty(); })) {
+      return nullptr;
+    }
+
+    // Currently we only support one sort key
+    const auto sort_col = *ordering_.sort_keys().at(0).target.name();
+    const auto comp = InputStateComparator();
+    std::make_heap(heap.begin(), heap.end(), comp);
+
+    // Each slice only has one record batch with the same schema as the output
+    std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+    for (int i = 0; i < output_schema_->num_fields(); i++) {
+      output_col_to_src[i] = std::make_pair(0, i);
+    }
+    SingleRecordBatchCompositeTable output(output_schema(), 1,
+                                           std::move(output_col_to_src),
+                                           plan()->query_context()->memory_pool());
+
+    // Generate rows until we run out of data or we exceed the target output
+    // size
+    bool waiting_for_more_data = false;
+    while (!waiting_for_more_data && !heap.empty() &&
+           output.Size() < kTargetOutputBatchSize) {
+      std::pop_heap(heap.begin(), heap.end(), comp);
+
+      auto& next_item = heap.back();
+      time_unit_t latest_time = std::numeric_limits<time_unit_t>::min();
+      time_unit_t new_time = next_item->GetLatestTime();
+      ARROW_CHECK(new_time >= latest_time)
+          << "Input state " << next_item->index()
+          << " has out of order data. newTime=" << new_time
+          << " latestTime=" << latest_time;
+
+      latest_time = new_time;
+      SingleRecordBatchSliceBuilder builder{&output};
+      next_item->Advance(builder);
+
+      if (builder.Size() > 0) {
+        output_counter[next_item->index()] += builder.Size();
+        builder.Finalize();
+      }
+      if (next_item->Finished()) {
+        heap.pop_back();
+      } else if (next_item->Empty()) {
+        // We've run out of data on one of the inputs
+        waiting_for_more_data = true;
+        continue;  // skip the unnecessary make_heap
+      }
+      std::make_heap(heap.begin(), heap.end(), comp);
+    }
+
+    // Emit the batch
+    if (output.Size() == 0) {
+      return nullptr;
+    }
+
+    ARROW_ASSIGN_OR_RAISE(auto maybe_rb, output.Materialize());
+    return maybe_rb.value_or(nullptr);
+  }
+  /// Gets a batch. Returns true if there is more data to process, false if we
+  /// are done or an error occurred
+  bool PollOnce() {
+    std::lock_guard<std::mutex> guard(gate);
+    if (!CheckEnded()) {
+      return false;
+    }
+
+    // Process batches while we have data
+    for (;;) {
+      Result<std::shared_ptr<RecordBatch>> result = getNextBatch();
+
+      if (result.ok()) {
+        auto out_rb = *result;
+        if (!out_rb) {
+          break;
+        }
+        ExecBatch out_b(*out_rb);
+        out_b.index = batches_produced++;
+        Status st = output_->InputReceived(this, std::move(out_b));
+        if (!st.ok()) {
+          ARROW_LOG(FATAL) << "Error in output_::InputReceived: " << st.ToString();
+          EndFromProcessThread(std::move(st));
+        }
+      } else {
+        EndFromProcessThread(result.status());
+        return false;
+      }
+    }
+
+    // Report to the output the total batch count, if we've already
+    // finished everything (there are two places where this can happen:
+    // here and InputFinished)
+    //
+    // It may happen here in cases where InputFinished was called before
+    // we were finished producing results (so we didn't know the output
+    // size at that time)
+    if (!CheckEnded()) {
+      return false;
+    }
+
+    // There is no more we can do now but there is still work remaining
+    // for later when more data arrives.
+    return true;
+  }
+
+  void EmitBatches() {
+    while (true) {
+      // Implementation note: If the queue is empty, we will block here
+      if (process_queue.Pop() == kPoisonPill) {
+        EndFromProcessThread();
+      }
+      // Either we're out of data or something went wrong
+      if (!PollOnce()) {
+        return;
+      }
+    }
+  }
+
+  /// The entry point for processThread
+  static void StartPoller(SortedMergeNode* node) { node->EmitBatches(); }
+
+  arrow::Ordering ordering_;
+
+  // Each input state corresponds to an input (e.g. a parquet data file)
+  std::vector<std::shared_ptr<InputState>> state;
+  std::vector<std::atomic_int64_t> input_counter;
+  std::vector<std::atomic_int64_t> output_counter;
+  std::mutex gate;
+
+  std::atomic<bool> cleanup_started{false};
+
+  // Backpressure counter common to all input states
+  std::atomic<int32_t> backpressure_counter;
+
+  std::atomic<int32_t> batches_produced{0};
+
+  // Queue to trigger processing of a given input. False acts as a poison pill
+  ConcurrentQueue<bool> process_queue;
+  // Once StartProducing is called, we initialize this thread to poll the
+  // input states and emit batches
+  std::thread process_thread;
+  arrow::Future<> process_task;
+
+  // Map arg index --> completion counter
+  std::vector<arrow::acero::AtomicCounter> counter_;
+  // Map arg index --> data
+  std::vector<InputState> accumulation_queue_;
+  std::mutex mutex_;
+  std::atomic<int> total_batches_{0};
+};
+
+}  // namespace
+
+namespace internal {
+void RegisterSortedMergeNode(ExecFactoryRegistry* registry) {
+  DCHECK_OK(registry->AddFactory("sorted_merge", SortedMergeNode::Make));
+}
+}  // namespace internal
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/sorted_merge_node_test.cc b/cpp/src/arrow/acero/sorted_merge_node_test.cc
new file mode 100644
index 0000000000..55446d631d
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node_test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/test_nodes.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/compute/ordering.h"
+#include "arrow/result.h"
+#include "arrow/scalar.h"
+#include "arrow/table.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::acero {
+
+std::shared_ptr<Table> TestTable(int start, int step, int rows_per_batch,
+                                 int num_batches) {
+  return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)},
+                   {"str", gen::Random(utf8())}})
+      ->FailOnError()
+      ->Table(rows_per_batch, num_batches);
+}
+
+TEST(SortedMergeNode, Basic) {
+  auto table1 = TestTable(
+      /*start=*/0,
+      /*step=*/2,
+      /*rows_per_batch=*/2,
+      /*num_batches=*/3);
+  auto table2 = TestTable(
+      /*start=*/1,
+      /*step=*/2,
+      /*rows_per_batch=*/3,
+      /*num_batches=*/2);
+  auto table3 = TestTable(
+      /*start=*/3,
+      /*step=*/3,
+      /*rows_per_batch=*/6,
+      /*num_batches=*/1);
+  std::vector<Declaration::Input> src_decls;
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3)));
+
+  auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")}));
+
+  Declaration sorted_merge{"sorted_merge", src_decls, ops};
+  // We can't use threads for sorted merging since it relies on
+  // ascending deterministic order of timestamps
+  ASSERT_OK_AND_ASSIGN(auto output,
+                       DeclarationToTable(sorted_merge, /*use_threads=*/false));
+  ASSERT_EQ(output->num_rows(), 18);
+
+  ASSERT_OK_AND_ASSIGN(auto expected_ts_builder,
+                       MakeBuilder(int32(), default_memory_pool()));
+  for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) {
+    ASSERT_OK(expected_ts_builder->AppendScalar(*MakeScalar(i)));
+  }
+  ASSERT_OK_AND_ASSIGN(auto expected_ts, expected_ts_builder->Finish());
+  auto output_col = output->column(0);
+  ASSERT_OK_AND_ASSIGN(auto output_ts, Concatenate(output_col->chunks()));
+
+  AssertArraysEqual(*expected_ts, *output_ts);
+}
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.cc b/cpp/src/arrow/acero/time_series_util.cc
new file mode 100644
index 0000000000..71133fef47
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/data.h"
+
+#include "arrow/acero/time_series_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+template <typename T, enable_if_t<std::is_integral<T>::value, bool>>
+inline uint64_t NormalizeTime(T t) {
+  uint64_t bias =
+      std::is_signed<T>::value ? static_cast<uint64_t>(1) << (8 * sizeof(T) - 1) : 0;
+  return t < 0 ? static_cast<uint64_t>(t + bias) : static_cast<uint64_t>(t);
+}
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row) {
+#define LATEST_VAL_CASE(id, val)                     \
+  case Type::id: {                                   \
+    using T = typename TypeIdTraits<Type::id>::Type; \
+    using CType = typename TypeTraits<T>::CType;     \
+    return val(data->GetValues<CType>(1)[row]);      \
+  }
+
+  auto data = batch->column_data(col);
+  switch (time_type) {
+    LATEST_VAL_CASE(INT8, NormalizeTime)
+    LATEST_VAL_CASE(INT16, NormalizeTime)
+    LATEST_VAL_CASE(INT32, NormalizeTime)
+    LATEST_VAL_CASE(INT64, NormalizeTime)
+    LATEST_VAL_CASE(UINT8, NormalizeTime)
+    LATEST_VAL_CASE(UINT16, NormalizeTime)
+    LATEST_VAL_CASE(UINT32, NormalizeTime)
+    LATEST_VAL_CASE(UINT64, NormalizeTime)
+    LATEST_VAL_CASE(DATE32, NormalizeTime)
+    LATEST_VAL_CASE(DATE64, NormalizeTime)
+    LATEST_VAL_CASE(TIME32, NormalizeTime)
+    LATEST_VAL_CASE(TIME64, NormalizeTime)
+    LATEST_VAL_CASE(TIMESTAMP, NormalizeTime)
+    default:
+      DCHECK(false);
+      return 0;  // cannot happen
+  }
+
+#undef LATEST_VAL_CASE
+}
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.h b/cpp/src/arrow/acero/time_series_util.h
new file mode 100644
index 0000000000..97707f43bf
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to unsigned 64-bits while preserving ordering of values
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+uint64_t NormalizeTime(T t);
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row);
+
+}  // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h
new file mode 100644
index 0000000000..05d6c86693
--- /dev/null
+++ b/cpp/src/arrow/acero/unmaterialized_table.h
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    ptr2Ref[(uintptr_t)ref.get()] = ref;
+  }
+  void AddSlice(const UnmaterializedSlice& slice) {
+    slices.push_back(slice);
+    num_rows += slice.Size();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_boolean<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
+              Status> static BuilderAppend(Builder& builder,
+                                           const std::shared_ptr<ArrayData>& source,
+                                           uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    using CType = typename TypeTraits<Type>::CType;
+    builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_base_binary<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      return builder.AppendNull();
+    }
+    using offset_type = typename Type::offset_type;
+    const uint8_t* data = source->buffers[2]->data();
+    const offset_type* offsets = source->GetValues<offset_type>(1);
+    const offset_type offset0 = offsets[row];
+    const offset_type offset1 = offsets[row + 1];
+    return builder.Append(data + offset0, offset1 - offset0);
+  }
+
+  template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
+  arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
+      const std::shared_ptr<arrow::DataType>& type, int i_col) {
+    ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+    Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
+    ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+    const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+    for (const auto& unmaterialized_slice : slices) {
+      const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+      if (batch) {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          arrow::Status st = BuilderAppend<Type, Builder>(
+              builder, batch->column_data(column_index), rowNum);
+          ARROW_RETURN_NOT_OK(st);
+        }
+      } else {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          ARROW_RETURN_NOT_OK(builder.AppendNull());
+        }
+      }
+    }
+    std::shared_ptr<arrow::Array> result;
+    ARROW_RETURN_NOT_OK(builder.Finish(&result));
+    return Result{std::move(result)};
+  }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder {
+ public:
+  explicit UnmaterializedSliceBuilder(
+      UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>* table_)
+      : table(table_) {}
+
+  void AddEntry(std::shared_ptr<RecordBatch> rb, uint64_t start, uint64_t end) {
+    if (rb) {
+      table->AddRecordBatchRef(rb);
+    }
+    if (slice.num_components) {
+      size_t last_index = slice.num_components - 1;
+      DCHECK_EQ(slice.components[last_index].end - slice.components[last_index].start,
+                end - start)
+          << "Slices should be the same length. ";
+    }
+    slice.components[slice.num_components++] = CompositeEntry{rb.get(), start, end};
+  }
+
+  void Finalize() { table->AddSlice(slice); }
+  int64_t Size() { return slice.Size(); }
+
+ private:
+  using TUnmaterializedCompositeTable =
+      UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>;
+  using TUnmaterializedSlice =
+      typename TUnmaterializedCompositeTable::UnmaterializedSlice;
+
+  TUnmaterializedCompositeTable* table;
+  TUnmaterializedSlice slice{};
+};
+
+}  // namespace arrow::acero