You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "JerAguilon (via GitHub)" <gi...@apache.org> on 2023/10/20 22:40:24 UTC

[PR] Create sorted merge node [arrow]

JerAguilon opened a new pull request, #38380:
URL: https://github.com/apache/arrow/pull/38380

   <!--
   Thanks for opening a pull request!
   If this is your first pull request you can find detailed information on how 
   to contribute here:
     * [New Contributor's Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
     * [Contributing Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
   
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   -->
   
   ### Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ### What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ### Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ### Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please uncomment the line below and explain which changes are breaking.
   -->
   <!-- **This PR includes breaking changes to public APIs.** -->
   
   <!--
   Please uncomment the line below (and provide explanation) if the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld). We use this to highlight fixes to issues that may affect users without their knowledge. For this reason, fixing bugs that cause errors don't count, since those are usually obvious.
   -->
   <!-- **This PR contains a "Critical Fix".** -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1793569675

   NVM, checking out arrow/main for `testing` worked. Ready for another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   At a high level this is how this new abstraction works:
   
   In the case of the asofjoin node, we create a `CompositeTableBuilder` local, which wraps this object when we have buffered enough data to create a new output batch. This is pretty much the same as `CompositeReferenceTable` in the current implementation.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as it's materialized.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each batch, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Create sorted merge node [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1773476557

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373695176


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;
+
+  inline int64_t Size() const {
+    if (num_components == 0) {
+      return 0;
+    }
+    return components[0].end - components[0].start;
+  }
+};
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons, so the caller
+/// must manually call addRecordBatchRef to maintain the lifetime of the stored
+/// record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {

Review Comment:
   Somewhat. I have made some slight API implementation changes. The sorted merge node creates true slices of N rows at a time.
   
   Example: Suppose the next biggest in32 timestamp is 1234. We will:
   
   1. heapify our N inputs, and surface the input with ts 1234 as its next row
   2. We could emit just the "head" of the input. 
   3. ...But suppose the 1000 rows below are still 1234 (pretty common in real world data). We can greatly improve the speed and reduce redundant heapify calls if we just take the current row and the next 1k rows as well, and submit them as a "slice" of sorts
   
   OTOH, the asof join was just doing things row by row. So the main change is we now store a `record batch, [start, end)` range rather than a `record batch, row`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today, which indeed avoids accumulating the entire input in memory: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   At a high level this is how this new abstraction works:
   
   In the case of the asofjoin node, we create a `CompositeTableBuilder` local, which wraps this object when we have buffered enough data to create a new output batch. This is pretty much the same as `CompositeReferenceTable` in the current implementation.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as we have `Materialize()`'d a record batch and sent it to the output node.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each output batch we intend to emit, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   In the case of the asofjoin node, we create a `CompositeTableBuilder` local, which wraps this object when we have buffered enough data to create a new output batch.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as it's materialized.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each batch, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1372442687


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;

Review Comment:
   nvm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1377657546


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    if (!ptr2Ref.count((uintptr_t)ref.get())) {
+      ptr2Ref[(uintptr_t)ref.get()] = ref;
+    }
+  }

Review Comment:
   ```suggestion
     void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
       ptr2Ref[(uintptr_t)ref.get()] = ref;
     }
   ```
   Why check for existence first?



##########
cpp/src/arrow/acero/asof_join_node.cc:
##########
@@ -118,96 +122,16 @@ struct TolType {
 };
 
 // Maximum number of tables that can be joined
-#define MAX_JOIN_TABLES 64
+#define MAX_JOIN_TABLES 4  // NOCOMMIT

Review Comment:
   NOCOMMIT?



##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   When do entries get removed from this map?



##########
cpp/src/arrow/acero/sorted_merge_node_test.cc:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/test_nodes.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/compute/ordering.h"
+#include "arrow/result.h"
+#include "arrow/scalar.h"
+#include "arrow/table.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::acero {
+
+std::shared_ptr<Table> TestTable(int start, int step, int rows_per_batch,
+                                 int num_batches) {
+  return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)},
+                   {"str", gen::Random(utf8())}})
+      ->FailOnError()
+      ->Table(rows_per_batch, num_batches);
+}
+
+TEST(SortedMergeNode, Basic) {
+  auto table1 = TestTable(
+      /*start=*/0,
+      /*step=*/2,
+      /*rows_per_batch=*/2,
+      /*num_batches=*/3);
+  auto table2 = TestTable(
+      /*start=*/1,
+      /*step=*/2,
+      /*rows_per_batch=*/3,
+      /*num_batches=*/2);
+  auto table3 = TestTable(
+      /*start=*/3,
+      /*step=*/3,
+      /*rows_per_batch=*/6,
+      /*num_batches=*/1);
+  std::vector<Declaration::Input> src_decls;
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3)));
+
+  auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")}));
+
+  Declaration sorted_merge{"sorted_merge", src_decls, ops};
+  // We can't use threads for sorted merging since it relies on
+  // ascending deterministic order of timestamps
+  ASSERT_OK_AND_ASSIGN(auto output,
+                       DeclarationToTable(sorted_merge, /*use_threads=*/false));
+  ASSERT_EQ(output->num_rows(), 18);
+
+  ASSERT_OK_AND_ASSIGN(auto expected_ts_builder,
+                       MakeBuilder(int32(), default_memory_pool()));
+  for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) {
+    ASSERT_OK(expected_ts_builder->AppendScalar(*MakeScalar(i)));
+  }
+  ASSERT_OK_AND_ASSIGN(auto expected_timestamps, expected_ts_builder->Finish());
+  auto chunked_array =
+      std::make_shared<arrow::ChunkedArray>(std::move(expected_timestamps));
+  ASSERT_TRUE(chunked_array->Equals(output->column(0)))
+      << chunked_array->ToString() << " " << output->column(0)->ToString();

Review Comment:
   We do have an `AssertArraysEqual` method that will compare two arrays (or even chunked arrays I think) and prints a more detailed diff if they are unequal that might be nice to use here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today, which indeed avoids accumulating the entire input in memory: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   At a high level this is how this new abstraction works:
   
   In the case of the asofjoin node when we have buffered enough data to create a new output batch, we create a `CompositeTableBuilder` local, which wraps this new `UnmaterializedCompositeTable`. This is pretty much the same as `CompositeReferenceTable` in the current implementation.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as we have `Materialize()`'d a record batch and sent it to the output node.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each output batch we intend to emit, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-38381: [C++][Acero] Create a sorted merge node [arrow]

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace merged PR #38380:
URL: https://github.com/apache/arrow/pull/38380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1372442687


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;

Review Comment:
   Will remove this & `num_composite_tables` below. They aren't needed and are easy to forget to set anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383353017


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   I'm probably missing something but does this mean it will accumulate the entire input into memory?  One advantage of a sorted merge node is that it can process a very large stream of data, data that is larger than the memory on the device.  If we are keeping references to all the record batches then will we still be able to do that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373771336


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    if (!ptr2Ref.count((uintptr_t)ref.get())) {
+      ptr2Ref[(uintptr_t)ref.get()] = ref;
+    }
+  }
+  void AddSlice(const UnmaterializedSlice& slice) {
+    slices.push_back(slice);
+    num_rows += slice.Size();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_boolean<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
+              Status> static BuilderAppend(Builder& builder,
+                                           const std::shared_ptr<ArrayData>& source,
+                                           uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    using CType = typename TypeTraits<Type>::CType;
+    builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_base_binary<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      return builder.AppendNull();
+    }
+    using offset_type = typename Type::offset_type;
+    const uint8_t* data = source->buffers[2]->data();
+    const offset_type* offsets = source->GetValues<offset_type>(1);
+    const offset_type offset0 = offsets[row];
+    const offset_type offset1 = offsets[row + 1];
+    return builder.Append(data + offset0, offset1 - offset0);
+  }
+
+  template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
+  arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
+      const std::shared_ptr<arrow::DataType>& type, int i_col) {
+    ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+    Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
+    ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+    const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+    for (const auto& unmaterialized_slice : slices) {
+      const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+      if (batch) {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          arrow::Status st = BuilderAppend<Type, Builder>(
+              builder, batch->column_data(column_index), rowNum);
+          ARROW_RETURN_NOT_OK(st);
+        }
+      } else {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          ARROW_RETURN_NOT_OK(builder.AppendNull());
+        }
+      }
+    }
+    std::shared_ptr<arrow::Array> result;
+    ARROW_RETURN_NOT_OK(builder.Finish(&result));
+    if (result->length() == 5) {
+      ARROW_LOG(ERROR) << result->ToString();
+    }
+    return Result{std::move(result)};
+  }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder {
+ public:
+  explicit UnmaterializedSliceBuilder(
+      UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>* table_)
+      : table(table_) {}
+
+  void AddEntry(std::shared_ptr<RecordBatch> rb, uint64_t start, uint64_t end) {
+    if (rb) {
+      table->AddRecordBatchRef(rb);

Review Comment:
   h/u on this new abstraction. I found having to call `AddRecordBatchRef(shared_ptr<RecordBatch>)` _and_ pulling out the raw pointer from the outside terribly unwieldy, so here's a new builder pattern 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373792311


##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input
+
+    bool active =
+        (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+    if (!active) {
+      return std::make_pair(UnmaterializedSlice(), nullptr);
+    }
+
+    row_index_t start = latest_ref_row_;
+    row_index_t end = latest_ref_row_;
+    time_unit_t startTime = GetLatestTime();
+    std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+    auto rows_in_batch = (row_index_t)batch->num_rows();
+
+    while (GetLatestTime() == startTime) {
+      end = ++latest_ref_row_;
+      if (latest_ref_row_ >= rows_in_batch) {
+        // hit the end of the batch, need to get the next batch if
+        // possible.
+        ++batches_processed_;
+        latest_ref_row_ = 0;
+        active &= !queue_.TryPop();
+        if (active) {
+          DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+                    0);  // empty batches disallowed, sanity check
+        }
+        break;
+      }
+    }
+
+    UnmaterializedSlice slice;
+    slice.num_components = 1;
+    slice.components[0] = CompositeEntry{batch.get(), start, end};
+    return std::make_pair(slice, batch);
+  }
+
+  arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+    if (rb->num_rows() > 0) {
+      queue_.Push(rb);
+    } else {
+      ++batches_processed_;  // don't enqueue empty batches, just record
+                             // as processed
+    }
+    return arrow::Status::OK();
+  }
+
+  const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+  void set_total_batches(int n) {
+    DCHECK_GE(n, 0);
+    DCHECK_EQ(total_batches_, -1) << "Set total batch more than once";
+    total_batches_ = n;
+  }
+
+ private:
+  size_t index_;
+  // Pending record batches. The latest is the front. Batches cannot be empty.
+  BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+  // Schema associated with the input
+  std::shared_ptr<arrow::Schema> schema_;
+  // Total number of batches (only int because InputFinished uses int)
+  std::atomic<int> total_batches_{-1};
+  // Number of batches processed so far (only int because InputFinished uses
+  // int)
+  std::atomic<int> batches_processed_{0};
+  // Index of the time col
+  col_index_t time_col_index_;
+  // Type id of the time column
+  arrow::Type::type time_type_id_;
+  // Index of the latest row reference within; if >0 then queue_ cannot be
+  // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+  row_index_t latest_ref_row_ = 0;
+  // Time of latest row
+  time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+  bool operator()(const std::shared_ptr<InputState>& lhs,
+                  const std::shared_ptr<InputState>& rhs) const {
+    // True if lhs is ahead of time of rhs
+    if (lhs->Finished()) {
+      return false;
+    }
+    if (rhs->Finished()) {
+      return false;
+    }
+    time_unit_t lFirst = lhs->GetLatestTime();
+    time_unit_t rFirst = rhs->GetLatestTime();
+    return lFirst > rFirst;
+  }
+};
+
+class SortedMergeNode : public ExecNode {
+  static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+  SortedMergeNode(arrow::acero::ExecPlan* plan,
+                  std::vector<arrow::acero::ExecNode*> inputs,
+                  std::shared_ptr<arrow::Schema> output_schema,
+                  arrow::Ordering new_ordering)
+      : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+        ordering_(std::move(new_ordering)),
+        input_counter(inputs_.size()),
+        output_counter(inputs_.size()),
+        process_thread() {
+    SetLabel("sorted_merge");
+  }
+
+  ~SortedMergeNode() override {
+    process_queue.Push(
+        POISON_PILL);  // poison pill
+                       // We might create a temporary (such as to inspect the output
+                       // schema), in which case there isn't anything  to join
+    if (process_thread.joinable()) {
+      process_thread.join();
+    }
+  }
+
+  static arrow::Result<arrow::acero::ExecNode*> Make(
+      arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+      const arrow::acero::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+                                         "SortedMergeNode"));
+
+    if (inputs.size() < 1) {
+      return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+    }
+
+    const auto schema = inputs.at(0)->output_schema();
+    for (const auto& input : inputs) {
+      if (!input->output_schema()->Equals(schema)) {
+        return Status::Invalid(
+            "SortedMergeNode input schemas must all "
+            "match, first schema "
+            "was: ",
+            schema->ToString(), " got schema: ", input->output_schema()->ToString());
+      }
+    }
+
+    const auto& order_options =
+        arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+    if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+      return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+    }
+
+    std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<SortedMergeNode>(
+        plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+  }
+
+  const char* kind_name() const override { return "SortedMergeNode"; }
+
+  const arrow::Ordering& ordering() const override { return ordering_; }
+
+  arrow::Status Init() override {
+    auto inputs = this->inputs();
+    for (size_t i = 0; i < inputs.size(); i++) {
+      ExecNode* input = inputs[i];
+      const auto& schema = input->output_schema();
+      const auto& sort_key = ordering_.sort_keys()[0];
+      if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+        return Status::Invalid("Only ascending sort order is supported");
+      }
+
+      const auto& ref = sort_key.target;
+      if (!ref.IsName()) {
+        return Status::Invalid("Ordering must be a name. ", ref.ToString(),
+                               " is not a name");
+      }

Review Comment:
   Below I do this:
   
   ```
   ARROW_ASSIGN_OR_RAISE(auto input_state,
                               InputState::Make<std::shared_ptr<InputState>>(
                                   i, input, this, backpressure_counter, schema,
                                   schema->GetFieldIndex(*ref.name())));
   ```
   
   Not sure if there's an API that I could be using to avoid this check, would be nice to remove the constraint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   At a high level this is how this new abstraction works:
   
   In the case of the asofjoin node, we create a `CompositeTableBuilder` local, which wraps this object when we have buffered enough data to create a new output batch. This is pretty much the same as `CompositeReferenceTable` in the current implementation.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as we have `Materialize()`'d a record batch and sent it to the output node.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each output batch we intend to emit, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383729799


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Ah, thank you.  The piece I was missing was that we destruct the `UnmaterializedCompositeTable` after materialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373771336


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    if (!ptr2Ref.count((uintptr_t)ref.get())) {
+      ptr2Ref[(uintptr_t)ref.get()] = ref;
+    }
+  }
+  void AddSlice(const UnmaterializedSlice& slice) {
+    slices.push_back(slice);
+    num_rows += slice.Size();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_boolean<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
+              Status> static BuilderAppend(Builder& builder,
+                                           const std::shared_ptr<ArrayData>& source,
+                                           uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    using CType = typename TypeTraits<Type>::CType;
+    builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_base_binary<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      return builder.AppendNull();
+    }
+    using offset_type = typename Type::offset_type;
+    const uint8_t* data = source->buffers[2]->data();
+    const offset_type* offsets = source->GetValues<offset_type>(1);
+    const offset_type offset0 = offsets[row];
+    const offset_type offset1 = offsets[row + 1];
+    return builder.Append(data + offset0, offset1 - offset0);
+  }
+
+  template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
+  arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
+      const std::shared_ptr<arrow::DataType>& type, int i_col) {
+    ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+    Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
+    ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+    const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+    for (const auto& unmaterialized_slice : slices) {
+      const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+      if (batch) {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          arrow::Status st = BuilderAppend<Type, Builder>(
+              builder, batch->column_data(column_index), rowNum);
+          ARROW_RETURN_NOT_OK(st);
+        }
+      } else {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          ARROW_RETURN_NOT_OK(builder.AppendNull());
+        }
+      }
+    }
+    std::shared_ptr<arrow::Array> result;
+    ARROW_RETURN_NOT_OK(builder.Finish(&result));
+    if (result->length() == 5) {
+      ARROW_LOG(ERROR) << result->ToString();
+    }
+    return Result{std::move(result)};
+  }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder {
+ public:
+  explicit UnmaterializedSliceBuilder(
+      UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>* table_)
+      : table(table_) {}
+
+  void AddEntry(std::shared_ptr<RecordBatch> rb, uint64_t start, uint64_t end) {
+    if (rb) {
+      table->AddRecordBatchRef(rb);

Review Comment:
   h/u on this new abstraction. I found having the `AddRecordBatchRef` terribly unwieldy, so here's a new builder pattern 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373695176


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;
+
+  inline int64_t Size() const {
+    if (num_components == 0) {
+      return 0;
+    }
+    return components[0].end - components[0].start;
+  }
+};
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons, so the caller
+/// must manually call addRecordBatchRef to maintain the lifetime of the stored
+/// record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {

Review Comment:
   Somewhat. I have made some slight API implementation changes. The sorted merge node creates true slices of N rows at a time.
   
   Example: Suppose the next biggest in32 timestamp is 1234. We will:
   
   1. heapify our N inputs, and surface the input with ts 1234 as its next row
   2. We could emit just the "head" of the input. 
   3. ...But suppose the 1000 rows below are still 1234 (pretty common in real world data). We can greatly improve the speed and reduce redundant heapify calls if we just take the current row and the next 1k rows as well, and submit them as a "slice" of sorts
   
   OTOH, the asof join was just doing things row by row. So the main change is we now store a `record batch, [start, end)` range rather than a `record batch, row`
   
   Also, I added a builder abstraction (commented below) just now to hide some of the `shared_ptr` efficiency hacks away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1377814155


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    if (!ptr2Ref.count((uintptr_t)ref.get())) {
+      ptr2Ref[(uintptr_t)ref.get()] = ref;
+    }
+  }

Review Comment:
   This block was copy/pasted from the asof implementation. Committing since I don't have a strong reason not to do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373762959


##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();

Review Comment:
   It's currently only used for `share_ptr<RecordBatch>`. IIUC we actually need to copy these to to ensure the lifetime of the batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373757021


##########
cpp/build-support/lint_cpp_cli.py:
##########
@@ -77,6 +77,7 @@ def lint_file(path):
 
 
 EXCLUSIONS = _paths('''\
+    arrow/acero/concurrent_queue.h

Review Comment:
   Went for option 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373430790


##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue

Review Comment:
   ```suggestion
    * Simple implementation for a thread safe blocking unbound multi-consumer / multi-producer concurrent queue
   ```



##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();

Review Comment:
   Do we need a `std::move` somewhere?  Are we assuming this queue will only be used for trivially copyable values?



##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+  std::mutex& GetMutex() { return mutex_; }
+
+ private:
+  std::queue<T> queue_;
+  mutable std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+template <typename T>
+class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+ private:
+  struct DoHandle {
+    explicit DoHandle(BackpressureConcurrentQueue& queue)
+        : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+    ~DoHandle() {
+      size_t end_size = queue_.UnsyncSize();

Review Comment:
   Can we put a brief comment here that this is safe because `~DoHandle()` should always run while the lock is held?  Also, is it simpler to just make `queue_` protected?



##########
cpp/src/arrow/acero/time_series_util.cc:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/data.h"
+
+#include "arrow/acero/time_series_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+// normalize the value to 64-bits while preserving ordering of values

Review Comment:
   ```suggestion
   // normalize the value to unsigned 64-bits while preserving ordering of values
   ```
   
   Normalizing to 64 bits is easy.  The trick I think you are applying here is the fact that you are normalizing to an _unsigned_ value and so you have to fiddle with negative values so they compare correctly right?
   
   Also, we can probably remove this comment since you have the same comment in the header file.



##########
cpp/src/arrow/acero/sorted_merge_node_test.cc:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>

Review Comment:
   We generally try to avoid using the `api.h` headers internally as they are very large headers and it slows down compilation time.  It's sometimes done in tests but even then it is nice if we can avoid it.



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;

Review Comment:
   Hmm...I thought the template parameter `<1>` was the # of inputs and not the # of batches.  I could very easily be misunderstanding though.



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {

Review Comment:
   This could probably be an anonymous namespace?



##########
cpp/build-support/lint_cpp_cli.py:
##########
@@ -77,6 +77,7 @@ def lint_file(path):
 
 
 EXCLUSIONS = _paths('''\
+    arrow/acero/concurrent_queue.h

Review Comment:
   Generally we work around this by playing games (pimpl, virtual methods) to abstract details from the header files and keep `<mutex>` in the .cc files.  However, that won't work since you're dealing with a templated class.  There are two options I can think of.
   
   1. Make the `concurrent_queue.h` header "internal".  The lint check only applies to external headers which are those that are referenced (directly or transitively) in one of the `api.h` headers.  At the moment I'm not even sure how this check failed since you are only referencing the header from `asof_join_node.cc` so this seems a simple enough fix.
   2. You can use `src/arrow/util/mutex.h` instead of `<mutex>`.  It adds a small dynamic function penalty but hides the details of the `<mutex>` header.
   
   Either way, I hope we can avoid listing this file here.



##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }

Review Comment:
   Can these methods be protected or are they needed by the asof join node?



##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {

Review Comment:
   Can we document these methods?  Should the unlocked variants be public?



##########
cpp/src/arrow/acero/time_series_util.h:
##########
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to 64-bits while preserving ordering of values
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+inline uint64_t NormalizeTime(T t);
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row);

Review Comment:
   Can you document this?



##########
cpp/src/arrow/acero/exec_plan.cc:
##########
@@ -18,6 +18,7 @@
 #include "arrow/acero/exec_plan.h"
 
 #include <atomic>
+#include <iostream>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/acero/sorted_merge_node_test.cc:
##########
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <gtest/gtest.h>
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/test_nodes.h"
+#include "arrow/compute/ordering.h"
+#include "arrow/result.h"
+#include "arrow/table.h"
+#include "arrow/testing/generator.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::acero {
+
+std::shared_ptr<Table> TestTable(int start, int step, int rows_per_batch,
+                                 int num_batches) {
+  return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)},
+                   {"str", gen::Random(utf8())}})
+      ->FailOnError()
+      ->Table(rows_per_batch, num_batches);
+}
+
+void CheckMerging() {
+  auto table1 = TestTable(
+      /*start=*/0,
+      /*step=*/2,
+      /*rows_per_batch=*/2,
+      /*num_batches=*/3);
+  auto table2 = TestTable(
+      /*start=*/1,
+      /*step=*/2,
+      /*rows_per_batch=*/3,
+      /*num_batches=*/2);
+  auto table3 = TestTable(
+      /*start=*/3,
+      /*step=*/3,
+      /*rows_per_batch=*/6,
+      /*num_batches=*/1);
+  std::vector<Declaration::Input> src_decls;
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2)));
+  src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3)));
+
+  auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")}));
+
+  Declaration sorted_merge{"sorted_merge", src_decls, ops};
+  // We can't use threads for sorted merging since it relies on
+  // ascending deterministic order of timestamps
+  ASSERT_OK_AND_ASSIGN(auto output,
+                       DeclarationToTable(sorted_merge, /*use_threads=*/false));
+  ASSERT_EQ(output->num_rows(), 18);
+
+  Int32Builder expected_ts_builder;
+  for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) {
+    ASSERT_OK(expected_ts_builder.Append(i));
+  }
+  ASSERT_OK_AND_ASSIGN(auto expected_timestamps, expected_ts_builder.Finish());
+  auto chunked_array =
+      std::make_shared<arrow::ChunkedArray>(std::move(expected_timestamps));
+  ASSERT_TRUE(chunked_array->Equals(output->column(0)))
+      << chunked_array->ToString() << " " << output->column(0)->ToString();
+}
+
+TEST(FetchNode, Basic) { CheckMerging(); }

Review Comment:
   Are you planning on adding more tests?  Why the indirection here?



##########
cpp/src/arrow/acero/time_series_util.h:
##########
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {

Review Comment:
   This change doesn't seem strictly related to a sorted merge node?  It's probably fine to do a bit of unrelated refactoring as part of this PR.  I just want to verify my understanding.



##########
cpp/src/arrow/acero/time_series_util.h:
##########
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to 64-bits while preserving ordering of values

Review Comment:
   ```suggestion
   // normalize the value to unsigned 64-bits while preserving ordering of values
   ```



##########
cpp/src/arrow/acero/time_series_util.h:
##########
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to 64-bits while preserving ordering of values
+template <typename T, enable_if_t<std::is_integral<T>::value, bool> = true>
+inline uint64_t NormalizeTime(T t);

Review Comment:
   Inline without definition?



##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;
+
+  inline int64_t Size() const {
+    if (num_components == 0) {
+      return 0;
+    }
+    return components[0].end - components[0].start;
+  }
+};
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons, so the caller
+/// must manually call addRecordBatchRef to maintain the lifetime of the stored
+/// record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {

Review Comment:
   I'm assuming this is just exported as-is from the asof join node?



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}

Review Comment:
   You've put other common helper utilities in their own files to avoid duplication.  Why not these?



##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {

Review Comment:
   Let's document what these are



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input

Review Comment:
   Time?  We're not dealing with time here right?



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false

Review Comment:
   Can we use constants instead of macros?  Also, then we can use `kNewTask` and `kPoisonPill`.



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input
+
+    bool active =
+        (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+    if (!active) {
+      return std::make_pair(UnmaterializedSlice(), nullptr);
+    }
+
+    row_index_t start = latest_ref_row_;
+    row_index_t end = latest_ref_row_;
+    time_unit_t startTime = GetLatestTime();
+    std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+    auto rows_in_batch = (row_index_t)batch->num_rows();
+
+    while (GetLatestTime() == startTime) {
+      end = ++latest_ref_row_;
+      if (latest_ref_row_ >= rows_in_batch) {
+        // hit the end of the batch, need to get the next batch if
+        // possible.
+        ++batches_processed_;
+        latest_ref_row_ = 0;
+        active &= !queue_.TryPop();
+        if (active) {
+          DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+                    0);  // empty batches disallowed, sanity check
+        }
+        break;
+      }
+    }
+
+    UnmaterializedSlice slice;
+    slice.num_components = 1;
+    slice.components[0] = CompositeEntry{batch.get(), start, end};
+    return std::make_pair(slice, batch);
+  }
+
+  arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+    if (rb->num_rows() > 0) {
+      queue_.Push(rb);
+    } else {
+      ++batches_processed_;  // don't enqueue empty batches, just record
+                             // as processed
+    }
+    return arrow::Status::OK();
+  }
+
+  const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+  void set_total_batches(int n) {
+    DCHECK_GE(n, 0);
+    DCHECK_EQ(total_batches_, -1) << "Set total batch more than once";
+    total_batches_ = n;
+  }
+
+ private:
+  size_t index_;
+  // Pending record batches. The latest is the front. Batches cannot be empty.
+  BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+  // Schema associated with the input
+  std::shared_ptr<arrow::Schema> schema_;
+  // Total number of batches (only int because InputFinished uses int)
+  std::atomic<int> total_batches_{-1};
+  // Number of batches processed so far (only int because InputFinished uses
+  // int)
+  std::atomic<int> batches_processed_{0};
+  // Index of the time col
+  col_index_t time_col_index_;
+  // Type id of the time column
+  arrow::Type::type time_type_id_;
+  // Index of the latest row reference within; if >0 then queue_ cannot be
+  // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+  row_index_t latest_ref_row_ = 0;
+  // Time of latest row
+  time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+  bool operator()(const std::shared_ptr<InputState>& lhs,
+                  const std::shared_ptr<InputState>& rhs) const {
+    // True if lhs is ahead of time of rhs
+    if (lhs->Finished()) {
+      return false;
+    }
+    if (rhs->Finished()) {
+      return false;
+    }
+    time_unit_t lFirst = lhs->GetLatestTime();
+    time_unit_t rFirst = rhs->GetLatestTime();
+    return lFirst > rFirst;
+  }
+};
+
+class SortedMergeNode : public ExecNode {
+  static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+  SortedMergeNode(arrow::acero::ExecPlan* plan,
+                  std::vector<arrow::acero::ExecNode*> inputs,
+                  std::shared_ptr<arrow::Schema> output_schema,
+                  arrow::Ordering new_ordering)
+      : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+        ordering_(std::move(new_ordering)),
+        input_counter(inputs_.size()),
+        output_counter(inputs_.size()),
+        process_thread() {
+    SetLabel("sorted_merge");
+  }
+
+  ~SortedMergeNode() override {
+    process_queue.Push(
+        POISON_PILL);  // poison pill
+                       // We might create a temporary (such as to inspect the output
+                       // schema), in which case there isn't anything  to join
+    if (process_thread.joinable()) {
+      process_thread.join();
+    }
+  }
+
+  static arrow::Result<arrow::acero::ExecNode*> Make(
+      arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+      const arrow::acero::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+                                         "SortedMergeNode"));
+
+    if (inputs.size() < 1) {
+      return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+    }
+
+    const auto schema = inputs.at(0)->output_schema();
+    for (const auto& input : inputs) {
+      if (!input->output_schema()->Equals(schema)) {
+        return Status::Invalid(
+            "SortedMergeNode input schemas must all "
+            "match, first schema "
+            "was: ",
+            schema->ToString(), " got schema: ", input->output_schema()->ToString());
+      }
+    }
+
+    const auto& order_options =
+        arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+    if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+      return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+    }
+
+    std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<SortedMergeNode>(
+        plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+  }
+
+  const char* kind_name() const override { return "SortedMergeNode"; }
+
+  const arrow::Ordering& ordering() const override { return ordering_; }
+
+  arrow::Status Init() override {
+    auto inputs = this->inputs();
+    for (size_t i = 0; i < inputs.size(); i++) {
+      ExecNode* input = inputs[i];
+      const auto& schema = input->output_schema();
+      const auto& sort_key = ordering_.sort_keys()[0];
+      if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+        return Status::Invalid("Only ascending sort order is supported");
+      }
+
+      const auto& ref = sort_key.target;
+      if (!ref.IsName()) {
+        return Status::Invalid("Ordering must be a name. ", ref.ToString(),
+                               " is not a name");
+      }

Review Comment:
   Why?



##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input
+
+    bool active =
+        (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+    if (!active) {
+      return std::make_pair(UnmaterializedSlice(), nullptr);
+    }
+
+    row_index_t start = latest_ref_row_;
+    row_index_t end = latest_ref_row_;
+    time_unit_t startTime = GetLatestTime();
+    std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+    auto rows_in_batch = (row_index_t)batch->num_rows();
+
+    while (GetLatestTime() == startTime) {
+      end = ++latest_ref_row_;
+      if (latest_ref_row_ >= rows_in_batch) {
+        // hit the end of the batch, need to get the next batch if
+        // possible.
+        ++batches_processed_;
+        latest_ref_row_ = 0;
+        active &= !queue_.TryPop();
+        if (active) {
+          DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+                    0);  // empty batches disallowed, sanity check
+        }
+        break;
+      }
+    }
+
+    UnmaterializedSlice slice;
+    slice.num_components = 1;
+    slice.components[0] = CompositeEntry{batch.get(), start, end};
+    return std::make_pair(slice, batch);
+  }
+
+  arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+    if (rb->num_rows() > 0) {
+      queue_.Push(rb);
+    } else {
+      ++batches_processed_;  // don't enqueue empty batches, just record
+                             // as processed
+    }
+    return arrow::Status::OK();
+  }
+
+  const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+  void set_total_batches(int n) {
+    DCHECK_GE(n, 0);
+    DCHECK_EQ(total_batches_, -1) << "Set total batch more than once";
+    total_batches_ = n;
+  }
+
+ private:
+  size_t index_;
+  // Pending record batches. The latest is the front. Batches cannot be empty.
+  BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+  // Schema associated with the input
+  std::shared_ptr<arrow::Schema> schema_;
+  // Total number of batches (only int because InputFinished uses int)
+  std::atomic<int> total_batches_{-1};
+  // Number of batches processed so far (only int because InputFinished uses
+  // int)
+  std::atomic<int> batches_processed_{0};
+  // Index of the time col
+  col_index_t time_col_index_;
+  // Type id of the time column
+  arrow::Type::type time_type_id_;
+  // Index of the latest row reference within; if >0 then queue_ cannot be
+  // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+  row_index_t latest_ref_row_ = 0;
+  // Time of latest row
+  time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+  bool operator()(const std::shared_ptr<InputState>& lhs,
+                  const std::shared_ptr<InputState>& rhs) const {
+    // True if lhs is ahead of time of rhs
+    if (lhs->Finished()) {
+      return false;
+    }
+    if (rhs->Finished()) {
+      return false;
+    }
+    time_unit_t lFirst = lhs->GetLatestTime();
+    time_unit_t rFirst = rhs->GetLatestTime();
+    return lFirst > rFirst;
+  }
+};
+
+class SortedMergeNode : public ExecNode {
+  static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+  SortedMergeNode(arrow::acero::ExecPlan* plan,
+                  std::vector<arrow::acero::ExecNode*> inputs,
+                  std::shared_ptr<arrow::Schema> output_schema,
+                  arrow::Ordering new_ordering)
+      : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+        ordering_(std::move(new_ordering)),
+        input_counter(inputs_.size()),
+        output_counter(inputs_.size()),
+        process_thread() {
+    SetLabel("sorted_merge");
+  }
+
+  ~SortedMergeNode() override {
+    process_queue.Push(
+        POISON_PILL);  // poison pill
+                       // We might create a temporary (such as to inspect the output
+                       // schema), in which case there isn't anything  to join
+    if (process_thread.joinable()) {
+      process_thread.join();
+    }
+  }
+
+  static arrow::Result<arrow::acero::ExecNode*> Make(
+      arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+      const arrow::acero::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+                                         "SortedMergeNode"));
+
+    if (inputs.size() < 1) {
+      return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+    }
+
+    const auto schema = inputs.at(0)->output_schema();
+    for (const auto& input : inputs) {
+      if (!input->output_schema()->Equals(schema)) {
+        return Status::Invalid(
+            "SortedMergeNode input schemas must all "
+            "match, first schema "
+            "was: ",
+            schema->ToString(), " got schema: ", input->output_schema()->ToString());
+      }
+    }
+
+    const auto& order_options =
+        arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+    if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+      return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+    }
+
+    std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<SortedMergeNode>(
+        plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+  }
+
+  const char* kind_name() const override { return "SortedMergeNode"; }
+
+  const arrow::Ordering& ordering() const override { return ordering_; }
+
+  arrow::Status Init() override {
+    auto inputs = this->inputs();
+    for (size_t i = 0; i < inputs.size(); i++) {
+      ExecNode* input = inputs[i];
+      const auto& schema = input->output_schema();
+      const auto& sort_key = ordering_.sort_keys()[0];
+      if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+        return Status::Invalid("Only ascending sort order is supported");
+      }

Review Comment:
   Maybe `NotSupported` instead of `Invalid` to indicate we can do it someday?  It seems we should eventually be able to support descending order easily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373784091


##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input

Review Comment:
   The asof join node has a time-ish nomenclature (things like `time_unit_t`, for example), so I kind of followed that tradition. in my biased opinion this is easier in my head to picture/understand, but let me know if you want me to change the language.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1374994105


##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+#define NEW_TASK true
+#define POISON_PILL false
+
+class BackpressureController : public BackpressureControl {
+ public:
+  BackpressureController(ExecNode* node, ExecNode* output,
+                         std::atomic<int32_t>& backpressure_counter)
+      : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+  void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+  void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+  ExecNode* node_;
+  ExecNode* output_;
+  std::atomic<int32_t>& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+  InputState(size_t index, BackpressureHandler handler,
+             const std::shared_ptr<arrow::Schema>& schema, const int time_col_index)
+      : index_(index),
+        queue_(std::move(handler)),
+        schema_(schema),
+        time_col_index_(time_col_index),
+        time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+  template <typename PtrType>
+  static arrow::Result<PtrType> Make(size_t index, arrow::acero::ExecNode* input,
+                                     arrow::acero::ExecNode* output,
+                                     std::atomic<int32_t>& backpressure_counter,
+                                     const std::shared_ptr<arrow::Schema>& schema,
+                                     const col_index_t time_col_index) {
+    constexpr size_t low_threshold = 4, high_threshold = 8;
+    std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
+        std::make_unique<BackpressureController>(input, output, backpressure_counter);
+    ARROW_ASSIGN_OR_RAISE(auto handler,
+                          BackpressureHandler::Make(input, low_threshold, high_threshold,
+                                                    std::move(backpressure_control)));
+    return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+  }
+
+  bool IsTimeColumn(col_index_t i) const {
+    DCHECK_LT(i, schema_->num_fields());
+    return (i == time_col_index_);
+  }
+
+  // Gets the latest row index, assuming the queue isn't empty
+  row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+  bool Empty() const {
+    // cannot be empty if ref row is >0 -- can avoid slow queue lock
+    // below
+    if (latest_ref_row_ > 0) {
+      return false;
+    }
+    return queue_.Empty();
+  }
+
+  size_t index() const { return index_; }
+
+  int total_batches() const { return total_batches_; }
+
+  // Gets latest batch (precondition: must not be empty)
+  const std::shared_ptr<arrow::RecordBatch>& GetLatestBatch() const {
+    return queue_.UnsyncFront();
+  }
+
+#define LATEST_VAL_CASE(id, val)                                   \
+  case arrow::Type::id: {                                          \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type; \
+    using CType = typename arrow::TypeTraits<T>::CType;            \
+    return val(data->GetValues<CType>(1)[row]);                    \
+  }
+
+  inline time_unit_t GetLatestTime() const {
+    return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+                   latest_ref_row_);
+  }
+
+#undef LATEST_VAL_CASE
+
+  bool Finished() const { return batches_processed_ == total_batches_; }
+
+  arrow::Result<std::pair<UnmaterializedSlice, std::shared_ptr<arrow::RecordBatch>>>
+  Advance() {
+    // Advance the row until a new time is encountered or the record batch
+    // ends. This will return a range of {-1, -1} and a nullptr if there is
+    // no input
+
+    bool active =
+        (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+    if (!active) {
+      return std::make_pair(UnmaterializedSlice(), nullptr);
+    }
+
+    row_index_t start = latest_ref_row_;
+    row_index_t end = latest_ref_row_;
+    time_unit_t startTime = GetLatestTime();
+    std::shared_ptr<arrow::RecordBatch> batch = queue_.UnsyncFront();
+    auto rows_in_batch = (row_index_t)batch->num_rows();
+
+    while (GetLatestTime() == startTime) {
+      end = ++latest_ref_row_;
+      if (latest_ref_row_ >= rows_in_batch) {
+        // hit the end of the batch, need to get the next batch if
+        // possible.
+        ++batches_processed_;
+        latest_ref_row_ = 0;
+        active &= !queue_.TryPop();
+        if (active) {
+          DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+                    0);  // empty batches disallowed, sanity check
+        }
+        break;
+      }
+    }
+
+    UnmaterializedSlice slice;
+    slice.num_components = 1;
+    slice.components[0] = CompositeEntry{batch.get(), start, end};
+    return std::make_pair(slice, batch);
+  }
+
+  arrow::Status Push(const std::shared_ptr<arrow::RecordBatch>& rb) {
+    if (rb->num_rows() > 0) {
+      queue_.Push(rb);
+    } else {
+      ++batches_processed_;  // don't enqueue empty batches, just record
+                             // as processed
+    }
+    return arrow::Status::OK();
+  }
+
+  const std::shared_ptr<arrow::Schema>& get_schema() const { return schema_; }
+
+  void set_total_batches(int n) {
+    DCHECK_GE(n, 0);
+    DCHECK_EQ(total_batches_, -1) << "Set total batch more than once";
+    total_batches_ = n;
+  }
+
+ private:
+  size_t index_;
+  // Pending record batches. The latest is the front. Batches cannot be empty.
+  BackpressureConcurrentQueue<std::shared_ptr<arrow::RecordBatch>> queue_;
+  // Schema associated with the input
+  std::shared_ptr<arrow::Schema> schema_;
+  // Total number of batches (only int because InputFinished uses int)
+  std::atomic<int> total_batches_{-1};
+  // Number of batches processed so far (only int because InputFinished uses
+  // int)
+  std::atomic<int> batches_processed_{0};
+  // Index of the time col
+  col_index_t time_col_index_;
+  // Type id of the time column
+  arrow::Type::type time_type_id_;
+  // Index of the latest row reference within; if >0 then queue_ cannot be
+  // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+  row_index_t latest_ref_row_ = 0;
+  // Time of latest row
+  time_unit_t latest_time_ = std::numeric_limits<time_unit_t>::lowest();
+};
+
+struct InputStateComparator {
+  bool operator()(const std::shared_ptr<InputState>& lhs,
+                  const std::shared_ptr<InputState>& rhs) const {
+    // True if lhs is ahead of time of rhs
+    if (lhs->Finished()) {
+      return false;
+    }
+    if (rhs->Finished()) {
+      return false;
+    }
+    time_unit_t lFirst = lhs->GetLatestTime();
+    time_unit_t rFirst = rhs->GetLatestTime();
+    return lFirst > rFirst;
+  }
+};
+
+class SortedMergeNode : public ExecNode {
+  static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+  SortedMergeNode(arrow::acero::ExecPlan* plan,
+                  std::vector<arrow::acero::ExecNode*> inputs,
+                  std::shared_ptr<arrow::Schema> output_schema,
+                  arrow::Ordering new_ordering)
+      : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+        ordering_(std::move(new_ordering)),
+        input_counter(inputs_.size()),
+        output_counter(inputs_.size()),
+        process_thread() {
+    SetLabel("sorted_merge");
+  }
+
+  ~SortedMergeNode() override {
+    process_queue.Push(
+        POISON_PILL);  // poison pill
+                       // We might create a temporary (such as to inspect the output
+                       // schema), in which case there isn't anything  to join
+    if (process_thread.joinable()) {
+      process_thread.join();
+    }
+  }
+
+  static arrow::Result<arrow::acero::ExecNode*> Make(
+      arrow::acero::ExecPlan* plan, std::vector<arrow::acero::ExecNode*> inputs,
+      const arrow::acero::ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast<int>(inputs.size()),
+                                         "SortedMergeNode"));
+
+    if (inputs.size() < 1) {
+      return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+    }
+
+    const auto schema = inputs.at(0)->output_schema();
+    for (const auto& input : inputs) {
+      if (!input->output_schema()->Equals(schema)) {
+        return Status::Invalid(
+            "SortedMergeNode input schemas must all "
+            "match, first schema "
+            "was: ",
+            schema->ToString(), " got schema: ", input->output_schema()->ToString());
+      }
+    }
+
+    const auto& order_options =
+        arrow::internal::checked_cast<const OrderByNodeOptions&>(options);
+
+    if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+      return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+    }
+
+    std::shared_ptr<Schema> output_schema = inputs[0]->output_schema();
+    return plan->EmplaceNode<SortedMergeNode>(
+        plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+  }
+
+  const char* kind_name() const override { return "SortedMergeNode"; }
+
+  const arrow::Ordering& ordering() const override { return ordering_; }
+
+  arrow::Status Init() override {
+    auto inputs = this->inputs();
+    for (size_t i = 0; i < inputs.size(); i++) {
+      ExecNode* input = inputs[i];
+      const auto& schema = input->output_schema();
+      const auto& sort_key = ordering_.sort_keys()[0];
+      if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+        return Status::Invalid("Only ascending sort order is supported");
+      }
+
+      const auto& ref = sort_key.target;
+      if (!ref.IsName()) {
+        return Status::Invalid("Ordering must be a name. ", ref.ToString(),
+                               " is not a name");
+      }

Review Comment:
   Update -- learned about `FieldRef::FindOne`. Turns out asof_join uses this too, so swapping to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1372414105


##########
cpp/build-support/lint_cpp_cli.py:
##########
@@ -77,6 +77,7 @@ def lint_file(path):
 
 
 EXCLUSIONS = _paths('''\
+    arrow/acero/concurrent_queue.h

Review Comment:
   `concurrent_queue.h` relies on `<mutex>` which fails the linter. LMK if there's a workaround aside from this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373695176


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+template <size_t MAX_COMPOSITE_TABLES>
+struct UnmaterializedSlice {
+  // A slice is represented by a [start, end) range of rows in a collection of record
+  // batches, where end-start is the same length
+
+  CompositeEntry components[MAX_COMPOSITE_TABLES];
+  size_t num_components;
+
+  inline int64_t Size() const {
+    if (num_components == 0) {
+      return 0;
+    }
+    return components[0].end - components[0].start;
+  }
+};
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons, so the caller
+/// must manually call addRecordBatchRef to maintain the lifetime of the stored
+/// record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {

Review Comment:
   Somewhat. I have made some slight API implementation changes. The sorted merge node creates true slices of N rows at a time.
   
   If the time is 1234, and the next 1000 rows are also 1234, we can greatly improve the speed and prevent unnecessary heapify calls if we just take the current row and the next 1k rows.
   
   OTOH, the asof join was just doing things row by row. So the main change is we now store a `record batch, [start, end)` range rather than a `record batch, row`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373704608


##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }

Review Comment:
   I'd like to leave these two around to limit the scope of this PR. I think it's a little hacky to be calling these but I'd need to put some thought about how to cleanly eradicate them from the asof join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373771336


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,279 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};
+  std::vector<UnmaterializedSlice> slices;
+
+  size_t num_rows = 0;
+
+  // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+  friend class UnmaterializedSliceBuilder<MAX_COMPOSITE_TABLES>;
+
+  void AddRecordBatchRef(const std::shared_ptr<arrow::RecordBatch>& ref) {
+    if (!ptr2Ref.count((uintptr_t)ref.get())) {
+      ptr2Ref[(uintptr_t)ref.get()] = ref;
+    }
+  }
+  void AddSlice(const UnmaterializedSlice& slice) {
+    slices.push_back(slice);
+    num_rows += slice.Size();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_boolean<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
+              Status> static BuilderAppend(Builder& builder,
+                                           const std::shared_ptr<ArrayData>& source,
+                                           uint64_t row) {
+    if (source->IsNull(row)) {
+      builder.UnsafeAppendNull();
+      return Status::OK();
+    }
+    using CType = typename TypeTraits<Type>::CType;
+    builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
+    return Status::OK();
+  }
+
+  template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
+  enable_if_base_binary<Type, Status> static BuilderAppend(
+      Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
+    if (source->IsNull(row)) {
+      return builder.AppendNull();
+    }
+    using offset_type = typename Type::offset_type;
+    const uint8_t* data = source->buffers[2]->data();
+    const offset_type* offsets = source->GetValues<offset_type>(1);
+    const offset_type offset0 = offsets[row];
+    const offset_type offset1 = offsets[row + 1];
+    return builder.Append(data + offset0, offset1 - offset0);
+  }
+
+  template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
+  arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
+      const std::shared_ptr<arrow::DataType>& type, int i_col) {
+    ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+    Builder& builder = *arrow::internal::checked_cast<Builder*>(builderPtr.get());
+    ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+    const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+    for (const auto& unmaterialized_slice : slices) {
+      const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+      if (batch) {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          arrow::Status st = BuilderAppend<Type, Builder>(
+              builder, batch->column_data(column_index), rowNum);
+          ARROW_RETURN_NOT_OK(st);
+        }
+      } else {
+        for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+          ARROW_RETURN_NOT_OK(builder.AppendNull());
+        }
+      }
+    }
+    std::shared_ptr<arrow::Array> result;
+    ARROW_RETURN_NOT_OK(builder.Finish(&result));
+    if (result->length() == 5) {
+      ARROW_LOG(ERROR) << result->ToString();
+    }
+    return Result{std::move(result)};
+  }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder {
+ public:
+  explicit UnmaterializedSliceBuilder(
+      UnmaterializedCompositeTable<MAX_COMPOSITE_TABLES>* table_)
+      : table(table_) {}
+
+  void AddEntry(std::shared_ptr<RecordBatch> rb, uint64_t start, uint64_t end) {
+    if (rb) {
+      table->AddRecordBatchRef(rb);

Review Comment:
   h/u on this new abstraction. I found having to call `AddRecordBatchRef` from the outside terribly unwieldy, so here's a new builder pattern 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1377803714


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Only when the `UnmaterializedTable` is destructed. There's perhaps an argument that we can clear out when we materialize into a real table since we don't need the references after we create an output record batch, but I was wary of this approach (what if one day we want to materialize twice?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-38381: [C++][Acero] Create a sorted merge node [arrow]

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1797021664

   After merging your PR, Conbench analyzed the 5 benchmarking runs that have been run so far on merge-commit b55d8d5664480f63c2be75f0e18eeb006b640427.
   
   There were no benchmark performance regressions. 🎉
   
   The [full Conbench report](https://github.com/apache/arrow/runs/18421994881) has more details. It also includes information about 4 possible false positives for unstable benchmarks that are known to sometimes produce them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373709613


##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+  std::mutex& GetMutex() { return mutex_; }
+
+ private:
+  std::queue<T> queue_;
+  mutable std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+template <typename T>
+class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+ private:
+  struct DoHandle {
+    explicit DoHandle(BackpressureConcurrentQueue& queue)
+        : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+    ~DoHandle() {
+      size_t end_size = queue_.UnsyncSize();

Review Comment:
   Will add a comment and punt--want to limit the blast radius of changing these asofjoin primitives to just docs for the time being.



##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {
+    auto item = queue_.front();
+    queue_.pop();
+    return item;
+  }
+
+  void Push(const T& item) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return PushUnlocked(item);
+  }
+
+  void PushUnlocked(const T& item) {
+    queue_.push(item);
+    cond_.notify_one();
+  }
+
+  void Clear() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    ClearUnlocked();
+  }
+
+  void ClearUnlocked() { queue_ = std::queue<T>(); }
+
+  std::optional<T> TryPop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return TryPopUnlocked();
+  }
+
+  std::optional<T> TryPopUnlocked() {
+    // Try to pop the oldest value from the queue (or return nullopt if none)
+    if (queue_.empty()) {
+      return std::nullopt;
+    } else {
+      auto item = queue_.front();
+      queue_.pop();
+      return item;
+    }
+  }
+
+  bool Empty() const {
+    std::unique_lock<std::mutex> lock(mutex_);
+    return queue_.empty();
+  }
+
+  // Un-synchronized access to front
+  // For this to be "safe":
+  // 1) the caller logically guarantees that queue is not empty
+  // 2) pop/try_pop cannot be called concurrently with this
+  const T& UnsyncFront() const { return queue_.front(); }
+
+  size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+  std::mutex& GetMutex() { return mutex_; }
+
+ private:
+  std::queue<T> queue_;
+  mutable std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+template <typename T>
+class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+ private:
+  struct DoHandle {
+    explicit DoHandle(BackpressureConcurrentQueue& queue)
+        : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+    ~DoHandle() {
+      size_t end_size = queue_.UnsyncSize();

Review Comment:
   Will add a comment and punt on the implementation change--want to limit the blast radius of changing these asofjoin primitives to just docs for the time being.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1781871517

   > What's your thinking around the long term approach?
   > * Migrate this node and asof join node to act more like the other nodes (no independent threads, works even if plan is multi-threaded)
   > * Migrate the other nodes to be more like this node (all parallelism is within each node and we don't have any plan-level parallelism)
   > * Keep the status quote (two types of nodes)
   
   I think bullet one is a noble idea, if we could ensure no performance regressions. I lack historical context on why the asof_join_node actually needs a proper `std::thread`, but I do think that the asofjoin's compute model is quite elegant for these timeseries-ish operations. Perhaps @icexelloss can chime in?
   
   I think if we keep the status quo, it'd be nice to add one more layer of abstraction so that the node itself isn't needing to bookkeep a process queue, since we're adding state to an already-stateful class. But that's a high level idea and I don't have a clear implementation idea yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373691281


##########
cpp/src/arrow/acero/time_series_util.h:
##########
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {

Review Comment:
   Not strictly, but this function is used by both the sorted merge node and the asof join node. It's brief, but I think the unsigned int normalizing work is hairy enough to share between the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373703436


##########
cpp/src/arrow/acero/concurrent_queue.h:
##########
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for an unbound concurrent queue
+ */
+template <class T>
+class ConcurrentQueue {
+ public:
+  T Pop() {
+    std::unique_lock<std::mutex> lock(mutex_);
+    cond_.wait(lock, [&] { return !queue_.empty(); });
+    return PopUnlocked();
+  }
+
+  T PopUnlocked() {

Review Comment:
   Will document and make unlocked blocks private--I was tentative to do more than cut/pasting where possible from the asofjoin node, but this is an easy one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373780764


##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}
+
+}  // namespace
+
+namespace arrow::acero {
+
+namespace sorted_merge {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using UnmaterializedSlice = arrow::acero::UnmaterializedSlice<1>;
+using UnmaterializedCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;

Review Comment:
   `<1>` is the number of batches per "slice". Since the sorted merge node is just plopping a single record batch, one slice on top of another, we only need a single record batch per slice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1373787258


##########
cpp/src/arrow/acero/sorted_merge_node.cc:
##########
@@ -0,0 +1,606 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/api.h>
+#include <atomic>
+#include <mutex>
+#include <sstream>
+#include <thread>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+#include "arrow/acero/concurrent_queue.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template <typename Callable>
+struct Defer {
+  Callable callable;
+  explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+  ~Defer() noexcept { callable(); }
+};
+
+std::vector<std::string> GetInputLabels(
+    const arrow::acero::ExecNode::NodeVector& inputs) {
+  std::vector<std::string> labels(inputs.size());
+  for (size_t i = 0; i < inputs.size(); i++) {
+    labels[i] = "input_" + std::to_string(i) + "_label";
+  }
+  return labels;
+}
+
+template <typename T, typename V = typename T::value_type>
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+  return std::find(container.begin(), container.end(), val);
+}
+
+template <typename T, typename V = typename T::value_type>
+inline bool std_has(const T& container, const V& val) {
+  return container.end() != std_find(container, val);
+}

Review Comment:
   No strong reason... I do like these helpers since they're terse, but I'm not sure where we'd put bits that are just a glorified wrapper around `std`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1782024162

   Ready for another round. TY for the feedback


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1787673184

   > Also, it looks like you've included some changes to the `testing` submodule, which are probably unintentional. You should be able to run `git submodule update` to clear these. Let me know if you have trouble and I can remove those changes real quick.
   
   Hrm, running this in `testing` is sadly not changing anything. Might need some assistance here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [GH-38381][C++][Acero] Create a sorted merge node [arrow]

Posted by "JerAguilon (via GitHub)" <gi...@apache.org>.
JerAguilon commented on code in PR #38380:
URL: https://github.com/apache/arrow/pull/38380#discussion_r1383714167


##########
cpp/src/arrow/acero/unmaterialized_table.h:
##########
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <optional>
+#include <vector>
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+  RecordBatch* batch;
+  uint64_t start;
+  uint64_t end;
+};
+
+// Forward declare the builder
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows.  Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented.  Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template <size_t MAX_COMPOSITE_TABLES>
+class UnmaterializedCompositeTable {
+ public:
+  UnmaterializedCompositeTable(
+      const std::shared_ptr<arrow::Schema>& output_schema, size_t num_composite_tables,
+      std::unordered_map<int, std::pair<int, int>> output_col_to_src_,
+      arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+      : schema(output_schema),
+        num_composite_tables(num_composite_tables),
+        output_col_to_src(std::move(output_col_to_src_)),
+        pool{pool_} {}
+
+  // Shallow wrappers around std::vector for performance
+  inline size_t capacity() { return slices.capacity(); }
+  inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+  inline size_t Size() const { return num_rows; }
+  inline size_t Empty() const { return num_rows == 0; }
+
+  Result<std::optional<std::shared_ptr<RecordBatch>>> Materialize() {
+    // Don't build empty batches
+    if (Empty()) {
+      return std::nullopt;
+    }
+    DCHECK_LE(Size(), (uint64_t)std::numeric_limits<int64_t>::max());
+    std::vector<std::shared_ptr<arrow::Array>> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id)                                                          \
+  case arrow::Type::id: {                                                             \
+    using T = typename arrow::TypeIdTraits<arrow::Type::id>::Type;                    \
+    ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn<T>(field_type, i_col)); \
+    break;                                                                            \
+  }
+
+    // Build the arrays column-by-column from the rows
+    for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+      const std::shared_ptr<arrow::Field>& field = schema->field(i_col);
+      const auto& field_type = field->type();
+
+      switch (field_type->id()) {
+        MATERIALIZE_CASE(BOOL)
+        MATERIALIZE_CASE(INT8)
+        MATERIALIZE_CASE(INT16)
+        MATERIALIZE_CASE(INT32)
+        MATERIALIZE_CASE(INT64)
+        MATERIALIZE_CASE(UINT8)
+        MATERIALIZE_CASE(UINT16)
+        MATERIALIZE_CASE(UINT32)
+        MATERIALIZE_CASE(UINT64)
+        MATERIALIZE_CASE(FLOAT)
+        MATERIALIZE_CASE(DOUBLE)
+        MATERIALIZE_CASE(DATE32)
+        MATERIALIZE_CASE(DATE64)
+        MATERIALIZE_CASE(TIME32)
+        MATERIALIZE_CASE(TIME64)
+        MATERIALIZE_CASE(TIMESTAMP)
+        MATERIALIZE_CASE(STRING)
+        MATERIALIZE_CASE(LARGE_STRING)
+        MATERIALIZE_CASE(BINARY)
+        MATERIALIZE_CASE(LARGE_BINARY)
+        default:
+          return arrow::Status::Invalid("Unsupported data type ",
+                                        field->type()->ToString(), " for field ",
+                                        field->name());
+      }
+    }
+
+#undef MATERIALIZE_CASE
+
+    std::shared_ptr<arrow::RecordBatch> r =
+        arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+    return r;
+  }
+
+ private:
+  struct UnmaterializedSlice {
+    CompositeEntry components[MAX_COMPOSITE_TABLES];
+    size_t num_components;
+
+    inline int64_t Size() const {
+      if (num_components == 0) {
+        return 0;
+      }
+      return components[0].end - components[0].start;
+    }
+  };
+
+  // Mapping from an output column ID to a source table ID and column ID
+  std::shared_ptr<arrow::Schema> schema;
+  size_t num_composite_tables;
+  std::unordered_map<int, std::pair<int, int>> output_col_to_src;
+
+  arrow::MemoryPool* pool;
+
+  /// A map from address of a record batch to the record batch. Used to
+  /// maintain the lifetime of the record batch in case it goes out of scope
+  /// by the main exec node thread
+  std::unordered_map<uintptr_t, std::shared_ptr<arrow::RecordBatch>> ptr2Ref = {};

Review Comment:
   Good Q! This is actually a generalization/replacement of the `CompositeReferenceTable` in asofjoin today: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/asof_join_node.cc#L1282
   
   The `CompositeReferenceTable` also stores a `ptr2ref` object, which you're right it will keep a set of record batches alive for its lifetime. However, these objects are only constructed ephemerally as locals while we output a batch of data.
   
   At a high level this is how this new abstraction works:
   
   In the case of the asofjoin node, we create a `CompositeTableBuilder` local, which wraps this object when we have buffered enough data to create a new output batch. This is pretty much the same as `CompositeReferenceTable` in the current implementation.
   
   In the case of sorted_merge, it's similar in that we wait until we have some data from all the inputs, and then ephemerally construct an `UnmaterializedCompositeTable` and destruct it as soon as we have `Materialize()`'d a record batch and sent it to the output node.
   
   What would be bad is if we kept the ptr2ref mapping alive longer than needed, but since we construct new instances of this class for each batch, record batches go out of scope appropriately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-38381: [C++][Acero] Create a sorted merge node [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #38380:
URL: https://github.com/apache/arrow/pull/38380#issuecomment-1795708246

   :warning: GitHub issue #38381 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org