You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "icexelloss (via GitHub)" <gi...@apache.org> on 2023/06/22 18:26:23 UTC

[GitHub] [arrow] icexelloss opened a new pull request, #36253: GH-36252: [Python] Compute hash aggregate udf

icexelloss opened a new pull request, #36253:
URL: https://github.com/apache/arrow/pull/36253

   ### Rationale for this change
   
   In https://github.com/apache/arrow/issues/35515,
   
   I have implemented a Scalar version of the non decomposable UDF (Scalar as in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as in HASH_AGGREGATE)
   
   With this PR, user can register an aggregate UDF once with `pc.register_aggregate_function` and it can be used as both scalar aggregate function and hash aggregate function.
   
   Example:
   
   ```
   def median(x):
       return pa.scalar(np.nanmedian(x))
   
   pc.register_aggregate_function(func=median, func_name='median_udf', ...)
   
   table = ...
   table.groupby("id").aggregate(["v", 'median_udf'])
   ```
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ### What changes are included in this PR?
   
   The main changes are:
   * In ResigterAggregateFunction (udf.cc), we now register the function both as a scalar aggregate function and a hash aggregate function (with signature adjustment for hash aggregate kernel because we need to append the grouping key)
   * Implemented PythonUdfHashAggregateImpl, similar to the PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input batches and the group id array. In Merge, it will merge the input batches and group id array (with the group_id_mapping). In Finalize, it will apply groupings to the accumulated batches to create one record batch per group, then apply the UDF over each group.
   
   For table.groupby().aggregate(...), the space complexity is O(n) where n is the size of the table (and therefore, is not very useful). However, this is more useful in the segmented aggregation case, where the space complexity of O(s), where s the size of the segments.
   
   ### Are these changes tested?
   Added new test in test_udf.py (with table.group_by().aggregate() and test_substrait.py (with segmented aggregation)
   
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ### Are there any user-facing changes?
   Yes with this change, user can call use registered aggregate UDF with `table.group_by().aggregate() ` or Acero's segmented aggregation.
   
   ### Checklist
   
   [ ] Self Review
   [ ] API Documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1612218471

   Thanks @westonpace. I applied your suggestion and will merge once CI passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Compute hash aggregate udf

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1238916255


##########
python/pyarrow/conftest.py:
##########
@@ -283,15 +285,14 @@ def unary_function(ctx, x):
 @pytest.fixture(scope="session")
 def unary_agg_func_fixture():
     """
-    Register a unary aggregate function
+    Register a unary aggregate function (mean)
     """
     from pyarrow import compute as pc
-    import numpy as np
 
-    def func(ctx, x):
+    def func(ctx, x, *args):

Review Comment:
   Revert this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1240534567


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {

Review Comment:
   ```suggestion
       std::vector<std::shared_ptr<Field>> fields;
       fields.reserve(input_types.size());
       for (size_t i = 0; i < input_types.size(); i++) {
   ```
   
   Minor nit



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies

Review Comment:
   ```suggestion
     // same as ApplyGrouping in parition.cc
     // replicated the code here to avoid complicating the dependencies
   ```
   I'm not sure how valuable this comment is in general though.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -15,16 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "arrow/python/udf.h"
+#include <iostream>

Review Comment:
   ```suggestion
   ```



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();

Review Comment:
   Does `num_groups` need to be updated here?



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));
+    UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+
+    if (rb->num_rows() == 0) {
+      return Status::Invalid("Finalized is called with empty inputs");

Review Comment:
   Why is this a problem?



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);

Review Comment:
   Why not just `groups_array_data.GetValues<uint32_t>(1);`?



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));

Review Comment:
   There are some cases where this won't be possible.  For example, if you have an array of strings then the array may only have 2GB of string data (regardless of how many elements it has).  So any single group can't have more than 2GB of string data.  I don't know this is fatal but you may want to mention in the user docs somewhere or wrap this failure with extra context.



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -392,24 +569,109 @@ Status RegisterAggregateFunction(PyObject* agg_function, UdfWrapperCallback agg_
   }
   compute::OutputType output_type(options.output_type);
 
-  compute::KernelInit init = [agg_wrapper, agg_function, options](
-                                 compute::KernelContext* ctx,
-                                 const compute::KernelInitArgs& args)
+  compute::KernelInit init = [cb, function, options](compute::KernelContext* ctx,
+                                                     const compute::KernelInitArgs& args)
       -> Result<std::unique_ptr<compute::KernelState>> {
     return std::make_unique<PythonUdfScalarAggregatorImpl>(
-        agg_wrapper, std::make_shared<OwnedRefNoGIL>(agg_function), options.input_types,
+        std::make_shared<OwnedRefNoGIL>(function), cb, options.input_types,
         options.output_type);
   };
 
-  RETURN_NOT_OK(AddAggKernel(
-      compute::KernelSignature::Make(std::move(input_types), std::move(output_type),
-                                     options.arity.is_varargs),
-      init, aggregate_func.get()));
-
+  auto sig = compute::KernelSignature::Make(
+      std::move(input_types), std::move(output_type), options.arity.is_varargs);
+  compute::ScalarAggregateKernel kernel(std::move(sig), std::move(init),
+                                        AggregateUdfConsume, AggregateUdfMerge,
+                                        AggregateUdfFinalize, /*ordered=*/false);
+  RETURN_NOT_OK(aggregate_func->AddKernel(std::move(kernel)));
   RETURN_NOT_OK(registry->AddFunction(std::move(aggregate_func)));
   return Status::OK();
 }
 
+/// @brief Create a new UdfOptions with adjustment for hash kernel
+/// @param options User provided udf options

Review Comment:
   ```suggestion
   /// \brief Create a new UdfOptions with adjustment for hash kernel
   /// \param options User provided udf options
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss merged pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss merged PR #36253:
URL: https://github.com/apache/arrow/pull/36253


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1242545830


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));

Review Comment:
   Added note in the user doc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1608292843

   @westonpace This should be clean now - another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1242281854


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();

Review Comment:
   I don't think `num_groups` need to be updated here. Reasoning:
   
   From the code in https://github.com/apache/arrow/blob/main/cpp/src/arrow/compute/kernels/hash_aggregate.cc#L233 and https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/groupby_aggregate_node.cc#L248
   (1) Other hash kernel implementation updates the num_groups in `Resize`
   (2) `resize` is always called before `consume` and `merge`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1607588881

   > I think the only concerning thing is that we need to restrict group sizes to things that will fit in a single batch. Do you think this will be a problem for your use cases?
   
   Thanks @westonpace. Currently we only plan to use this with segmented aggregation so each group is not going to be very large. (grouping inside a segment), so I don't think it would be a problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1603486290

   @icexelloss I should have some time to take a look tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Compute hash aggregate udf

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1238933343


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -355,18 +536,10 @@ Status RegisterTabularFunction(PyObject* user_function, UdfWrapperCallback wrapp
       wrapper, options, registry);
 }
 
-Status AddAggKernel(std::shared_ptr<compute::KernelSignature> sig,

Review Comment:
   This is inlined now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1610290790

   Gentle ping @westonpace anything else  you want me to change here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1242545830


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));

Review Comment:
   Added note in the user doc: https://github.com/apache/arrow/pull/36253/files#diff-439f91c435cc8136d40eaba8c168aaa1cec00f08b10ba92ce376e47f29f81814R2770



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1245568309


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +247,164 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
   std::vector<std::shared_ptr<RecordBatch>> values;
-  std::shared_ptr<OwnedRefNoGIL> agg_function;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());

Review Comment:
   These `INCREF`'s still seem superfluous to me but I don't think it's critical.  We could test in a follow-up using temporary function registries to see if we are preventing UDF functions from being garbage collected.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2767,6 +2767,9 @@ def register_aggregate_function(func, function_name, function_doc, in_types, out
     This is often used with ordered or segmented aggregation where groups
     can be emit before accumulating all of the input data.
 
+    Note that currently size of any input column can not exceed 2 GB limit
+    (all groups combined).

Review Comment:
   ```suggestion
       Note that currently the size of any input column can not exceed 2 GB
       for a single segment (all groups combined).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1245902270


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +247,164 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
   std::vector<std::shared_ptr<RecordBatch>> values;
-  std::shared_ptr<OwnedRefNoGIL> agg_function;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());

Review Comment:
   I agree with you. I plan to address this in https://github.com/apache/arrow/issues/36000 but haven't got to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Compute hash aggregate udf

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1603311058

   @westonpace I would like a request a review on this PR. The code should be relatively straight forward and similar to https://github.com/apache/arrow/pull/35514 so hopefully no confusion/surprises here. 
   
   For the implementation of the grouping, I used an approach similar to `GroupedListImpl` aggregator and `partition.cc`
   
   For the registration, I decided to register both scalar/hash kernel with one API `register_aggregate_function` because I think scalar/hash difference is not really something user should be worry about (from user's point of view, it is just "aggregation", whether it is hash/scalar is an compute implementation detail).
   
   Let me know if those sounds OK to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Compute hash aggregate udf

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1238893501


##########
cpp/src/arrow/compute/row/grouper.h:
##########
@@ -149,8 +149,7 @@ class ARROW_EXPORT Grouper {
   ///       []
   ///   ]
   static Result<std::shared_ptr<ListArray>> MakeGroupings(
-      const UInt32Array& ids, uint32_t num_groups,
-      ExecContext* ctx = default_exec_context());
+      const UInt32Array& ids, uint32_t, ExecContext* ctx = default_exec_context());

Review Comment:
   Revert this change - accidental



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1242305228


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));
+    UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+
+    if (rb->num_rows() == 0) {
+      return Status::Invalid("Finalized is called with empty inputs");

Review Comment:
   Good catch - I was being lazy here and didn't want to bother with empty aggregation. But now I look at this I can just return empty result here. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1620157196

   Conbench analyzed the 6 benchmark runs on commit `baf17a20`.
   
   There was 1 benchmark result with an error:
   
   - Commit Run on `ec2-m5-4xlarge-us-east-2` at [2023-06-29 16:54:37Z](http://conbench.ursa.dev/compare/runs/4b76489876c04b20820a0b5c82c25773...780e2359ab6e4549b648c3d33d38314f/)
     - [engine=arrow, format=parquet, language=R, memory_map=False, query_id=TPCH-15, scale_factor=10](http://conbench.ursa.dev/benchmark-results/0649dc96898b71538000a14b54e75437)
   
   There were no benchmark performance regressions. 🎉
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14766638630) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #36253:
URL: https://github.com/apache/arrow/pull/36253#discussion_r1242545319


##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);

Review Comment:
   Good catch - updated



##########
python/pyarrow/src/arrow/python/udf.cc:
##########
@@ -215,9 +249,162 @@ struct PythonUdfScalarAggregatorImpl : public ScalarUdfAggregator {
     return Status::OK();
   }
 
-  UdfWrapperCallback agg_cb;
+  std::shared_ptr<OwnedRefNoGIL> function;
+  UdfWrapperCallback cb;
+  std::vector<std::shared_ptr<RecordBatch>> values;
+  std::shared_ptr<Schema> input_schema;
+  std::shared_ptr<DataType> output_type;
+};
+
+struct PythonUdfHashAggregatorImpl : public HashUdfAggregator {
+  PythonUdfHashAggregatorImpl(std::shared_ptr<OwnedRefNoGIL> function,
+                              UdfWrapperCallback cb,
+                              std::vector<std::shared_ptr<DataType>> input_types,
+                              std::shared_ptr<DataType> output_type)
+      : function(function), cb(std::move(cb)), output_type(std::move(output_type)) {
+    Py_INCREF(function->obj());
+    std::vector<std::shared_ptr<Field>> fields;
+    for (size_t i = 0; i < input_types.size(); i++) {
+      fields.push_back(field("", input_types[i]));
+    }
+    input_schema = schema(std::move(fields));
+  };
+
+  ~PythonUdfHashAggregatorImpl() override {
+    if (_Py_IsFinalizing()) {
+      function->detach();
+    }
+  }
+
+  /// @brief Same as ApplyGrouping in parition.cc
+  /// Replicated the code here to avoid complicating the dependencies
+  static Result<RecordBatchVector> ApplyGroupings(
+      const ListArray& groupings, const std::shared_ptr<RecordBatch>& batch) {
+    ARROW_ASSIGN_OR_RAISE(Datum sorted,
+                          compute::Take(batch, groupings.data()->child_data[0]));
+
+    const auto& sorted_batch = *sorted.record_batch();
+
+    RecordBatchVector out(static_cast<size_t>(groupings.length()));
+    for (size_t i = 0; i < out.size(); ++i) {
+      out[i] = sorted_batch.Slice(groupings.value_offset(i), groupings.value_length(i));
+    }
+
+    return out;
+  }
+
+  Status Resize(KernelContext* ctx, int64_t new_num_groups) {
+    // We only need to change num_groups in resize
+    // similar to other hash aggregate kernels
+    num_groups = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) {
+    ARROW_ASSIGN_OR_RAISE(
+        std::shared_ptr<RecordBatch> rb,
+        batch.ToExecBatch().ToRecordBatch(input_schema, ctx->memory_pool()));
+
+    // This is similar to GroupedListImpl
+    // last array is the group id
+    const ArraySpan& groups_array_data = batch[batch.num_values() - 1].array;
+    DCHECK_EQ(groups_array_data.offset, 0);
+    int64_t batch_num_values = groups_array_data.length;
+    const auto* batch_groups = groups_array_data.GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups.Append(batch_groups, batch_num_values));
+    values.push_back(std::move(rb));
+    num_values += batch_num_values;
+    return Status::OK();
+  }
+  Status Merge(KernelContext* ctx, KernelState&& other_state,
+               const ArrayData& group_id_mapping) {
+    // This is similar to GroupedListImpl
+    auto& other = checked_cast<PythonUdfHashAggregatorImpl&>(other_state);
+    auto& other_values = other.values;
+    const uint32_t* other_raw_groups = other.groups.data();
+    values.insert(values.end(), std::make_move_iterator(other_values.begin()),
+                  std::make_move_iterator(other_values.end()));
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other.num_values;
+         ++other_g) {
+      // Different state can have different group_id mappings, so we
+      // need to translate the ids
+      RETURN_NOT_OK(groups.Append(g[other_raw_groups[other_g]]));
+    }
+
+    num_values += other.num_values;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext* ctx, Datum* out) {
+    // Exclude the last column which is the group id
+    const int num_args = input_schema->num_fields() - 1;
+
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groupings,
+                          Grouper::MakeGroupings(UInt32Array(num_values, groups_buffer),
+                                                 static_cast<uint32_t>(num_groups)));
+
+    ARROW_ASSIGN_OR_RAISE(auto table,
+                          arrow::Table::FromRecordBatches(input_schema, values));
+    ARROW_ASSIGN_OR_RAISE(auto rb, table->CombineChunksToBatch(ctx->memory_pool()));
+    UdfContext udf_context{ctx->memory_pool(), table->num_rows()};
+
+    if (rb->num_rows() == 0) {
+      return Status::Invalid("Finalized is called with empty inputs");

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #36253: GH-36252: [Python] Add non decomposable hash aggregate UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #36253:
URL: https://github.com/apache/arrow/pull/36253#issuecomment-1613242212

   CI failure is unrelated. Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org