You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/22 08:42:02 UTC

[GitHub] [arrow] dhruv9vats opened a new pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

dhruv9vats opened a new pull request #12484:
URL: https://github.com/apache/arrow/pull/12484


   `hash_list` gathers the grouped values into a list array.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1049879462


   For `null`, yes, you just need to count how many values were seen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823887982



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));

Review comment:
       What's not okay?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);

Review comment:
       ```suggestion
         const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0, 0);
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Yes, GetSet::AppendBuffers should take uint8_t* uniformly, casting to `bool*` in one case is something we should avoid.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Note you'll have to take the type as well, so you can compute the bit/byte width. That has the added bonus, however, that you _should_ be able to use it for FixedSizeBinaryType now too. (Though, it may be tricky to get everything else to line up in the templates.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r824820358



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -3129,6 +3537,9 @@ const FunctionDoc hash_one_doc{"Get one value from each group",
                                ("Null values are also returned."),
                                {"array", "group_id_array"}};
 
+const FunctionDoc hash_list_doc{"List array of all groups",

Review comment:
       "List all values in each group"?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -621,6 +622,12 @@ struct GroupedValueTraits {
 
   static CType Get(const CType* values, uint32_t g) { return values[g]; }
   static void Set(CType* values, uint32_t g, CType v) { values[g] = v; }
+  static Status AppendBuffers(TypedBufferBuilder<CType>* destination,
+                              const uint8_t* values, int64_t offset, int64_t num_values) {
+    RETURN_NOT_OK(
+        destination->Append(reinterpret_cast<const CType*>(values) + offset, num_values));

Review comment:
       This is why I mentioned passing the actual type here as well `const DataType& type` since you can use `FixedWidthType::bit_width` to compute the width (I believe there's a byte_width in the class hierarchy as well)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));

Review comment:
       I assume the grouper never generates a group ID array with an offset, we can add a DCHECK to enforce this

##########
File path: docs/source/cpp/compute.rst
##########
@@ -334,25 +334,27 @@ equivalents above and reflects how they are implemented internally.
 +-------------------------+-------+------------------------------------+------------------------+----------------------------------+-------+
 | hash_distinct           | Unary | Any                                | Input type             | :struct:`CountOptions`           | \(2)  |
 +-------------------------+-------+------------------------------------+------------------------+----------------------------------+-------+
+| hash_list               | Unary | Any                                | List value type        |                                  | \(3)  |

Review comment:
       Maybe return "List of input type" to be clear

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -621,6 +622,12 @@ struct GroupedValueTraits {
 
   static CType Get(const CType* values, uint32_t g) { return values[g]; }
   static void Set(CType* values, uint32_t g, CType v) { values[g] = v; }
+  static Status AppendBuffers(TypedBufferBuilder<CType>* destination,
+                              const uint8_t* values, int64_t offset, int64_t num_values) {
+    RETURN_NOT_OK(
+        destination->Append(reinterpret_cast<const CType*>(values) + offset, num_values));

Review comment:
       You can look here for inspiration: https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/copy_data_internal.h
   
   …and frankly, you can consider just using those utilities directly! I forgot about that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1068077394


   Benchmark runs are scheduled for baseline = fbbe5361eff3c1601b7b5f228c52a662f4c124e4 and contender = 2e314cbee42c7cf25c8e30934e945f11a070e54b. 2e314cbee42c7cf25c8e30934e945f11a070e54b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/e91c151c6e0c444e918bff413044271e...52c35a4090a64edb89ead6afcea43d4e/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a64b046118794a98af2981ea395b4ad2...b1eb42950c194e3db852878336c8e316/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/ee72dd7929dd4f8ab9212f99fd58aaa2...e33a2d4e3e4347469d5ee75d97acf3ab/)
   [Finished :arrow_down:1.45% :arrow_up:0.38%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/83519cfc616a441b9d14a585ffeb447b...875075d516924f928312ea7646d76308/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826904427



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,406 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    DCHECK_EQ(groups_array_data->offset, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    DCHECK_EQ(groups_array_data->offset, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    num_args_ += num_values;
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    values_.insert(values_.end(), other->values_.begin(), other->values_.end());
+
+    const uint8_t* values_bitmap = other->values_bitmap_.data();
+    RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+        &values_bitmap_, values_bitmap, 0, other->num_args_));
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data =
+        ArrayData::Make(out_type_, num_args_, {std::move(null_bitmap_buffer), nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(values_array_data.get(), values_));
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<util::optional<StringType>>& values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool()));
+    auto* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();
+    offset_type total_length = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (bit_util::GetBit(null_bitmap, i)) {
+        const util::optional<StringType>& value = values[i];
+        DCHECK(value.has_value());
+        if (value->size() >
+                static_cast<size_t>(std::numeric_limits<offset_type>::max()) ||
+            arrow::internal::AddWithOverflow(
+                total_length, static_cast<offset_type>(value->size()), &total_length)) {
+          return Status::Invalid("Result is too large to fit in ", *array->type,
+                                 " cast to large_ variant of type");
+        }
+      }
+      offsets[i] = total_length;
+    }
+    ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length, ctx_->memory_pool()));
+    int64_t offset = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (bit_util::GetBit(null_bitmap, i)) {
+        const util::optional<StringType>& value = values[i];
+        DCHECK(value.has_value());
+        std::memcpy(data->mutable_data() + offset, value->data(), value->size());
+        offset += value->size();
+      }
+    }
+    array->buffers[1] = std::move(raw_offsets);
+    array->buffers.push_back(std::move(data));
+    return Status::OK();
+  }
+
+  template <typename T = Type>
+  enable_if_same<T, FixedSizeBinaryType, Status> MakeOffsetsValues(

Review comment:
       Yeah, if we did move FixedSizeBinary with the rest of the FixedSizeTypes, we could improve the string cases as well; we could directly build the final offsets/values buffers instead of storing `std::vector<std::string>` which would avoid all the small allocations and copy at the end. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826326399



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       So, the key insight is that FixedSizeBinaryType should actually go with all the other fixed-size types, not the string type. It only has a validity buffer and a data buffer, just like other fixed-size types. The only snag is that instead of using templates/casting to figure out how much data to copy, you have to use the type instance to know. That's what copy_data_utils.h also does, and why I suggest passing the type to AppendBuffers.
   
   …I guess that gets hard with VisitGroupedValues, though, I think you can still make it work (you would just not use VisitGroupedValues, you'd visit the group ID array and work with offsets and uint8_t* only for fixed size types). However, I think we're getting into the weeds. Let's return it to how it was before, and fix up the cases of AppendBuffers that weren't working. And then we can think about if we want to attempt further generalizations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #12484:
URL: https://github.com/apache/arrow/pull/12484


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1047552262






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826324653



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(
+      const ExecBatch& batch) {
+    int64_t num_values = batch[0].array()->length;
+    num_args_ += num_values;
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  template <typename T = Type>
+  enable_if_t<std::is_same<T, FixedSizeBinaryType>::value, Status> AppendBuffers(
+      const ExecBatch& batch) {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    std::string str(
+        checked_cast<const FixedSizeBinaryType&>(*batch[0].type()).byte_width(), '0');
+    StringType s(str.data(), str.size(), allocator_);
+
+    //    values_.reserve(num_values * checked_cast<const
+    //    FixedSizeBinaryType&>(*batch[0].type()).byte_width());
+    values_.resize(values_.size() + num_values, s);
+    CopyDataUtils<FixedSizeBinaryType>::CopyData(

Review comment:
       Ah, I forgot it required you to preallocate data, hmm. In that case I might not bother with that, just note that you can use similar ideas to know how much you need to append in AppendBuffers, if that makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1068077394


   Benchmark runs are scheduled for baseline = fbbe5361eff3c1601b7b5f228c52a662f4c124e4 and contender = 2e314cbee42c7cf25c8e30934e945f11a070e54b. 2e314cbee42c7cf25c8e30934e945f11a070e54b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/e91c151c6e0c444e918bff413044271e...52c35a4090a64edb89ead6afcea43d4e/)
   [Finished :arrow_down:0.29% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a64b046118794a98af2981ea395b4ad2...b1eb42950c194e3db852878336c8e316/)
   [Finished :arrow_down:1.07% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/ee72dd7929dd4f8ab9212f99fd58aaa2...e33a2d4e3e4347469d5ee75d97acf3ab/)
   [Finished :arrow_down:1.45% :arrow_up:0.38%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/83519cfc616a441b9d14a585ffeb447b...875075d516924f928312ea7646d76308/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822454763



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Can you elaborate more on the assumption you suggest here? Because say for example:
   ```cpp
   auto in_schema = schema({
         field("floats", float64()),
         //      field("nulls", null()),
         field("booleans", boolean()),
         field("decimal128", decimal128(3, 2)),
         field("decimal256", decimal256(3, 2)),
         field("fixed_binary", fixed_size_binary(3)),
         field("key", int64()),
     });
   auto table = TableFromJSON(in_schema, {R"([
       [null, true,   null,    null,    null,  1],
       [1.0,  true,   "1.01",  "1.01",  "aaa", 1]
   ])",
                                                R"([
       [0.0,   false, "0.00",  "0.00",  "bac", 2],
       [null,  false, null,    null,    null,  3],
       [4.0,   null,  "4.01",  "4.01",  "234", null],
       [3.25,  true,  "3.25",  "3.25",  "ddd", 1],
       [0.125, false, "0.12",  "0.12",  "bcd", 2]
   ])",
                                                R"([
       [-0.25, false, "-0.25", "-0.25", "bab", 2],
       [0.75,  true,  "0.75",  "0.75",  "123", null],
       [null,  true,  null,    null,    null,  3]
   ])"});
   ```
    The 2nd batch of the above table on my local machine is sometimes received in chunks as:
   ```cpp
   // with offset 0
    [0.0,   false, "0.00",  "0.00",  "bac", 2],
    [null,  false, null,    null,    null,  3],
   ```
   ```cpp
   // with offset 2
   [4.0,   null,  "4.01",  "4.01",  "234", null],
   [3.25,  true,  "3.25",  "3.25",  "ddd", 1],
   ```
   ```cpp
   // with offset 4
    [0.125, false, "0.12",  "0.12",  "bcd", 2]
   ```
   
   And the kernel works correctly _without_ DCHECK (and of course fails with it).

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -630,6 +635,12 @@ struct GroupedValueTraits<BooleanType> {
   static void Set(uint8_t* values, uint32_t g, bool v) {
     bit_util::SetBitTo(values, g, v);
   }
+  static Status AppendBuffers(TypedBufferBuilder<bool>& destination, const bool* values,
+                              int64_t num_values) {
+    RETURN_NOT_OK(destination.Reserve(num_values));
+    destination.UnsafeAppend(reinterpret_cast<const uint8_t*>(values), 0, num_values);
+    return Status::OK();
+  }

Review comment:
       Is this similar to what you had in mind?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823847455



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       Does the current structure look somewhat correct? Also, please suggest a more rigorous way of testing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r813900617



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();

Review comment:
       The same goes here: is there a way to just concat the array's buffers with ours? Since we're dealing only with primitive types here?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);

Review comment:
       This is probably as good as it gets

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       Or really: the `else` block is appending _the exact same values already in the other arrays_, so if we can just concatenate the two arrays and avoid the branching, that should presumably be much faster. (At least for values and values_bitmap, groups_ probably still has to be done element-by-element unfortunately. Even then, you could perhaps reserve enough capacity in groups_, then use TransposeInts to do it all in one go.)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);

Review comment:
       allocator_ is not a parameter of emplace in the first place. emplace/emplace_back forward their arguments so really it's part of the std::basic_string constructor. And hence allocator_ should still work here.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back(util::string_view{});
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       I think here we can just concatenate vectors as with above

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       I would assume we want to preserve null values seen in the other group, no? If we do, is there a way to just concat two arrays instead of copying values one-by-one?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back(util::string_view{});

Review comment:
       Or just "".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r820457521



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }

Review comment:
       Is there a way to achieve what we have for primitive types for binary types, i.e., the append-whole-buffers logic?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }

Review comment:
       Did give `TransposeInts` a try, but it needed raw `uint8_t*` pointers. So if using it if preferred, how would one retrieve the raw pointers from TypedBufferBuilder, as doing something like:
   ```
   reinterpret_cast<uint8_t*>(groups_.data());
   ```
   throws this error:
   ```
   Reinterpret_cast from 'const unsigned int *' to 'uint8_t *' (aka 'unsigned char *') casts away qualifiers
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {

Review comment:
       As we don't know if future batches will contain `null`s or not, appending all `true`'s here, but in the case when there is no `null` in any batch, this might be a waste.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));

Review comment:
       This line does not work for `bool` type. So if this append-whole-buffers approach is to be followed, should we make a different implementation for `bool`, inheriting from the current `GroupedListImpl`, like in CountDistinct/Distinct (but just overriding the `Consume` & `Merge` methods), or is there some conditional logic that can be used here?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));

Review comment:
       Had to explicitly call `StringType` to get `allocator_` to work. Is this appropriate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822606737



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Also, boolean type also has problems with `use_threads = true`:
   ```cpp
   [
     true,
     true,
     true,
     false,
     null,
     false,
     false,
     true,
     false,
     false
   ]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823833874



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Casting here and below as `AppendBuffers` takes `const CType*` while `data()` gives `uint8_t*`, and when tried overloading `AppendBuffers` for `uint8_t*`, there becomes two viable options for the `uint8_t` type. So just wanted to get your thoughts on this. Also, have made `AppendBuffers` take `const CType*` and `const bool*` because `GetValues(1, 0)`(used in Consume) returns them this way.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    const auto* values = batch[0].array()->GetValues<CType>(1, 0);
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<bool>(0, 0);
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const auto* values = reinterpret_cast<const CType*>(other->values_.data());

Review comment:
       Should we consider using `buffers[1].data()` instead of `GetValues(1, 0)` and make `AppendBuffers` accept `uint8_t*` while doing the casting to `CType*` internally?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));

Review comment:
       Is this okay?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       I couldn't find an `Append` counterpart of https://github.com/apache/arrow/blob/55b7bb85de7bc66720367532407d8a86da693ddf/cpp/src/arrow/buffer_builder.h#L355-L361 for bool type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822585993



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Is this somehow related to this?
   For columns 1 and 4 (zero indexed) in the table above for `use_exec_plan = true` I get incorrect `values` Array in the `Finalize` step:
   ```cpp
   // col 1 - boolean
   [
     true,
     true,
     false,
     false,
     null,
     null,
     false,
     false,
     true,
     false
   ]
   
   // col 4 fixed size binary
   [
     null,
     616161,
     626163,
     null,
     null,
     null,
     null,
     626162,
     313233,
     null
   ]
   ```
   
   While it appears to be correct with `use_exec_plan = false`:
   ```cpp
   // col 1 - boolean
   [
     true,
     true,
     false,
     false,
     null,
     true,
     false,
     false,
     true,
     true
   ]
   
   // col 4 - fixed sized binary
   [
     null,
     616161,
     626163,
     null,
     323334,
     646464,
     626364,
     626162,
     313233,
     null
   ]
   ```
   
   Am I handling these type incorrectly in the implementation? (These are without the `DCHECK_EQ`)
   What am I missing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822673661



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       bitmaps should have offsets handled manually, something like this (untested)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -621,6 +622,11 @@ struct GroupedValueTraits {
 
   static CType Get(const CType* values, uint32_t g) { return values[g]; }
   static void Set(CType* values, uint32_t g, CType v) { values[g] = v; }
+  static Status AppendBuffers(TypedBufferBuilder<CType>& destination, const CType* values,

Review comment:
       nit: prefer raw pointers over mutable references (`TypedBufferBuilder<CType>*`), while the style guide we use no longer requires this, our codebase was written with this rule in place so let's be consistent

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       There's no point doing reserve-then-unsafe-append unless you know the entire size to reserve up front, because append will just do the reserve for you

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       GetValues<CType> naively tries to adjust the returned pointer to account for the offset but it is unable to do this for boolean type. So we should use `GetValues<CType>(1, 0)` to prevent it from adjusting the offset, and then our code needs to handle the offset itself (e.g. AppendBuffers needs to take an offset)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;

Review comment:
       Doesn't really seem that big a deal to me

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -630,6 +635,12 @@ struct GroupedValueTraits<BooleanType> {
   static void Set(uint8_t* values, uint32_t g, bool v) {
     bit_util::SetBitTo(values, g, v);
   }
+  static Status AppendBuffers(TypedBufferBuilder<bool>& destination, const bool* values,
+                              int64_t num_values) {
+    RETURN_NOT_OK(destination.Reserve(num_values));
+    destination.UnsafeAppend(reinterpret_cast<const uint8_t*>(values), 0, num_values);
+    return Status::OK();
+  }

Review comment:
       Yes.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Well then the assumption is incorrect, and we need to handle offsets.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       ```suggestion
         const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0, 0);
         RETURN_NOT_OK(values_bitmap_.Append(values_bitmap, batch[0].array()->offset, num_values));
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       also this is probably what's causing issues down below for fixed-size binary, and the fact that other types didn't fail just means we have a gap in our testing

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       also maybe considering saving a `const auto& values = *batch[0].array();` up top so we aren't unboxing this repeatedly (though compiler probably optimizes that) and so it's clear what batch[0] is (I always forget)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826595048



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       > So, the key insight is that FixedSizeBinaryType should actually go with all the other fixed-size types, not the string type.
   
   I did not catch this before. This makes a lot more sense now.
   But again in the current use case where we append string like types to a vector instead of a `BufferBuilder`, it won't become fully generic, I guess?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       Also, `MakeOffsetsValues` for `FixedSizeBinaryType`, does basically what we want, but just element wise. So modifying that to suit current needs could be a potential lead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823892107



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2770,336 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    //    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    int64_t num_values = batch[0].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);

Review comment:
       This seems on the right track.
   
   This is generally what we do for tests, it gets annoying quickly to test different permutations - we probably need to come up with a reasonable test helper




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826897974



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -37,6 +39,7 @@
 #include "arrow/compute/kernels/aggregate_internal.h"
 #include "arrow/compute/kernels/aggregate_var_std_internal.h"
 #include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/copy_data_internal.h"

Review comment:
       Are we using this header?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,406 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();

Review comment:
       nit: `const auto&` to avoid the copy

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       FixedSizeBinaryType doesn't have an offsets buffer. That's why it fits with the fixed-size types. It _is_ for all intents and purposes the same as an array of integers - it's just that the element width isn't fixed at compile time, and you don't need VisitGroupedValues or anything for the fixed size types.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,406 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();

Review comment:
       `const auto&` here too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r811834476



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);

Review comment:
       What would be a more concise way to do this?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back(util::string_view{});

Review comment:
       Is this appropriate?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);

Review comment:
       Have simply used `emplace_back` as opposed to `emplace` as used here:
   https://github.com/apache/arrow/blob/5216c2bb3e4f9298230e2e38f29ff9889b5a152d/cpp/src/arrow/compute/kernels/hash_aggregate.cc#L1853
   Is this valid or will something like resizing and then using `emplace` will have to be used? (As `allocator_` is not used explicitly in `emplace_back`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r824093460



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2939,26 +3287,26 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
     ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
     groupers[thread_index].reset();
 
-    for (size_t i = 0; i < kernels.size(); ++i) {
+    for (size_t idx = 0; idx < kernels.size(); ++idx) {

Review comment:
       Just a nit: but my clang-tidy kept showing me that this `Declaration shadows a local variable`. Will revert it if not needed.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2771,343 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    int64_t num_values = batch[0].array()->length;
+    int64_t offset = batch[0].array()->offset;
+
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));

Review comment:
       That `groups_.Append` is working correctly without passing the offset. Even though we use `GetValues<uint32_t>(1, 0)`?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -621,6 +622,12 @@ struct GroupedValueTraits {
 
   static CType Get(const CType* values, uint32_t g) { return values[g]; }
   static void Set(CType* values, uint32_t g, CType v) { values[g] = v; }
+  static Status AppendBuffers(TypedBufferBuilder<CType>* destination,
+                              const uint8_t* values, int64_t offset, int64_t num_values) {
+    RETURN_NOT_OK(
+        destination->Append(reinterpret_cast<const CType*>(values) + offset, num_values));

Review comment:
       Currently accounting for the type size with this cast. Will have to see how to extend to (fixed)-binary-types.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r820713662



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       I would maybe `DCHECK_EQ(batch[0].array()->offset, 0);` just to be safe (and make explicit the assumption here)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }

Review comment:
       There's `mutable_data()` too.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {

Review comment:
       You can check `batch[0].GetNullCount()` or whatever the method is, and only allocate a bitmap if there are any nulls.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));

Review comment:
       should be fine

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }

Review comment:
       Also, `TransposeInts` is templated and should be instantiated for uint32_t too. However, it's not that big a deal and this loop is fine. All the other kernels have something like this anyways.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));

Review comment:
       Er, aren't we using a `nullptr` value here? Shouldn't we allocate a bitmap first?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }

Review comment:
       Probably, but we'd have to specialize fixed-width binary separately from variable-width binary, I think

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));

Review comment:
       You can extend this trait: https://github.com/apache/arrow/blob/253303ec96499d4b1e333d5c7e7783f3c96de78b/cpp/src/arrow/compute/kernels/hash_aggregate.cc#L615-L633




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826907452



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       Right now we use `TypedBufferBuilder<CType>` for fixed-width ctypes right now. How would we extend this to `FixedSizeBinaryType`? Maybe using the basic `BufferBuilder` and doing the reserving/appending similar to `CopyData`? Should I go ahead and try and integrate `FixedSizeBinaryType` with other fixed-width types?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r823845893



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));

Review comment:
       I guess `values_bitmap` and `values_bitmap_` were different. 🤔  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826325068



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));

Review comment:
       DecimalType is just a special case of FixedSizeBinaryType as far as we're concerned here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1068077394


   Benchmark runs are scheduled for baseline = fbbe5361eff3c1601b7b5f228c52a662f4c124e4 and contender = 2e314cbee42c7cf25c8e30934e945f11a070e54b. 2e314cbee42c7cf25c8e30934e945f11a070e54b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/e91c151c6e0c444e918bff413044271e...52c35a4090a64edb89ead6afcea43d4e/)
   [Finished :arrow_down:0.29% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a64b046118794a98af2981ea395b4ad2...b1eb42950c194e3db852878336c8e316/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/ee72dd7929dd4f8ab9212f99fd58aaa2...e33a2d4e3e4347469d5ee75d97acf3ab/)
   [Finished :arrow_down:1.45% :arrow_up:0.38%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/83519cfc616a441b9d14a585ffeb447b...875075d516924f928312ea7646d76308/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r827054779



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,406 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    DCHECK_EQ(groups_array_data->offset, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    DCHECK_EQ(groups_array_data->offset, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    num_args_ += num_values;
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    values_.insert(values_.end(), other->values_.begin(), other->values_.end());
+
+    const uint8_t* values_bitmap = other->values_bitmap_.data();
+    RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+        &values_bitmap_, values_bitmap, 0, other->num_args_));
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data =
+        ArrayData::Make(out_type_, num_args_, {std::move(null_bitmap_buffer), nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(values_array_data.get(), values_));
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<util::optional<StringType>>& values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool()));
+    auto* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();
+    offset_type total_length = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (bit_util::GetBit(null_bitmap, i)) {
+        const util::optional<StringType>& value = values[i];
+        DCHECK(value.has_value());
+        if (value->size() >
+                static_cast<size_t>(std::numeric_limits<offset_type>::max()) ||
+            arrow::internal::AddWithOverflow(
+                total_length, static_cast<offset_type>(value->size()), &total_length)) {
+          return Status::Invalid("Result is too large to fit in ", *array->type,
+                                 " cast to large_ variant of type");
+        }
+      }
+      offsets[i] = total_length;
+    }
+    ARROW_ASSIGN_OR_RAISE(auto data, AllocateBuffer(total_length, ctx_->memory_pool()));
+    int64_t offset = 0;
+    for (size_t i = 0; i < values.size(); i++) {
+      if (bit_util::GetBit(null_bitmap, i)) {
+        const util::optional<StringType>& value = values[i];
+        DCHECK(value.has_value());
+        std::memcpy(data->mutable_data() + offset, value->data(), value->size());
+        offset += value->size();
+      }
+    }
+    array->buffers[1] = std::move(raw_offsets);
+    array->buffers.push_back(std::move(data));
+    return Status::OK();
+  }
+
+  template <typename T = Type>
+  enable_if_same<T, FixedSizeBinaryType, Status> MakeOffsetsValues(

Review comment:
       Actually, I'm wrong here- I always forget about the group IDs getting in the way of that…




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r826909260



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       Everything would work on a `TypedBufferBuilder<uint8_t>`, and would work like CopyData, yes. But I think let's just clean this up since it works already. We can file a separate Jira if you'd like to keep looking at optimizations (I think this would be faster for Decimal/FixedSizeBinary but we should have benchmarks to be sure)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1068077394


   Benchmark runs are scheduled for baseline = fbbe5361eff3c1601b7b5f228c52a662f4c124e4 and contender = 2e314cbee42c7cf25c8e30934e945f11a070e54b. 2e314cbee42c7cf25c8e30934e945f11a070e54b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Scheduled] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/e91c151c6e0c444e918bff413044271e...52c35a4090a64edb89ead6afcea43d4e/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a64b046118794a98af2981ea395b4ad2...b1eb42950c194e3db852878336c8e316/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/ee72dd7929dd4f8ab9212f99fd58aaa2...e33a2d4e3e4347469d5ee75d97acf3ab/)
   [Scheduled] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/83519cfc616a441b9d14a585ffeb447b...875075d516924f928312ea7646d76308/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1068077394


   Benchmark runs are scheduled for baseline = fbbe5361eff3c1601b7b5f228c52a662f4c124e4 and contender = 2e314cbee42c7cf25c8e30934e945f11a070e54b. 2e314cbee42c7cf25c8e30934e945f11a070e54b is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/e91c151c6e0c444e918bff413044271e...52c35a4090a64edb89ead6afcea43d4e/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a64b046118794a98af2981ea395b4ad2...b1eb42950c194e3db852878336c8e316/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/ee72dd7929dd4f8ab9212f99fd58aaa2...e33a2d4e3e4347469d5ee75d97acf3ab/)
   [Scheduled] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/83519cfc616a441b9d14a585ffeb447b...875075d516924f928312ea7646d76308/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#issuecomment-1049832936


   Will probably use something similar to `hash_count` for the `null` type. Is this the correct general direction @lidavidm ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822588437



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    RETURN_NOT_OK(values_.Append(other->values_.data(), other->num_args_));
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+      values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    values_.insert(values_.end(), other->values_.begin(), other->values_.end());
+    RETURN_NOT_OK(values_bitmap_.Reserve(other->num_args_));
+    values_bitmap_.UnsafeAppend(other->values_bitmap_.data(), 0, other->num_args_);
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data =
+        ArrayData::Make(out_type_, num_args_, {std::move(null_bitmap_buffer), nullptr});
+    RETURN_NOT_OK(MakeOffsetsValues(values_array_data.get(), values_));
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  template <typename T = Type>
+  enable_if_base_binary<T, Status> MakeOffsetsValues(
+      ArrayData* array, const std::vector<util::optional<StringType>>& values) {
+    using offset_type = typename T::offset_type;
+    ARROW_ASSIGN_OR_RAISE(
+        auto raw_offsets,
+        AllocateBuffer((1 + values.size()) * sizeof(offset_type), ctx_->memory_pool()));
+    auto* offsets = reinterpret_cast<offset_type*>(raw_offsets->mutable_data());
+    offsets[0] = 0;
+    offsets++;
+    const uint8_t* null_bitmap = array->buffers[0]->data();

Review comment:
       This is the reason I've not included the optional-null-bitmap logic into binary type implementation. I've refrained myself from altering these `MakeOffsetsValues` methods as they are carry overs from MinMax.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -2747,6 +2747,288 @@ TEST(GroupBy, OneScalar) {
   }
 }
 
+TEST(GroupBy, ListNumeric) {
+  for (const auto& type : NumericTypes()) {
+    for (auto use_threads : {true, false}) {
+      SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+      {  // With nulls
+        SCOPED_TRACE("with nulls");
+        auto table =
+            TableFromJSON(schema({field("argument", type), field("key", int64())}), {R"([
+    [99,  1],
+    [99,  1]
+])",
+                                                                                     R"([
+    [88,  2],
+    [null,   3],
+    [null,   3]
+])",
+                                                                                     R"([
+    [null,   4],
+    [null,   4]
+])",
+                                                                                     R"([
+    [77,  null],
+    [99,  3]
+])",
+                                                                                     R"([
+    [88,  2],
+    [66, 2]
+])",
+                                                                                     R"([
+    [55, null],
+    [44,  3]
+  ])",
+                                                                                     R"([
+    [33,    null],
+    [22,    null]
+  ])"});
+
+        ASSERT_OK_AND_ASSIGN(auto aggregated_and_grouped,
+                             internal::GroupBy(
+                                 {
+                                     table->GetColumnByName("argument"),
+                                 },
+                                 {
+                                     table->GetColumnByName("key"),
+                                 },
+                                 {
+                                     {"hash_list", nullptr},
+                                 },
+                                 use_threads));
+        ValidateOutput(aggregated_and_grouped);
+        SortBy({"key_0"}, &aggregated_and_grouped);
+
+        // Order of sub-arrays is not stable
+        auto sort = [](const Array& arr) -> std::shared_ptr<Array> {
+          EXPECT_OK_AND_ASSIGN(auto indices, SortIndices(arr));
+          EXPECT_OK_AND_ASSIGN(auto sorted, Take(arr, indices));
+          return sorted.make_array();
+        };
+
+        auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
+
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        AssertDatumsEqual(ArrayFromJSON(type, R"([99, 99])"),
+                          sort(*list_arr->value_slice(0)),
+                          /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([66, 88, 88])"),
+                          sort(*list_arr->value_slice(1)), /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([44, 99, null, null])"),
+                          sort(*list_arr->value_slice(2)), /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([null, null])"),
+                          sort(*list_arr->value_slice(3)),
+                          /*verbose=*/true);
+        AssertDatumsEqual(ArrayFromJSON(type, R"([22, 33, 55, 77])"),
+                          sort(*list_arr->value_slice(4)), /*verbose=*/true);
+      }
+      {  // Without nulls

Review comment:
       Added these for the time-being to test the optional-null-bitmap logic. Will remove or consolidate them as necessary later?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2769,333 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    DCHECK_EQ(batch[0].array()->offset, 0);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    int64_t num_values = batch[1].array()->length;
+
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(GetSet::AppendBuffers(values_, values, num_values));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;

Review comment:
       This seems a little messy at first sight. Are there better ways to do this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r822585993



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);

Review comment:
       Is this somehow related to this?
   For columns 1 and 4 (zero indexed) in the table above for `use_exec_plan = true` I get:
   ```cpp
   // col 1 - boolean
   [
     true,
     true,
     false,
     false,
     null,
     null,
     false,
     false,
     true,
     false
   ]
   
   // col 4 fixed size binary
   [
     null,
     616161,
     626163,
     null,
     null,
     null,
     null,
     626162,
     313233,
     null
   ]
   ```
   
   While it appears to be correct with `use_exec_plan = false`:
   ```cpp
   // col 1 - boolean
   [
     true,
     true,
     false,
     false,
     null,
     true,
     false,
     false,
     true,
     true
   ]
   
   // col 4 - fixed sized binary
   [
     null,
     616161,
     626163,
     null,
     323334,
     646464,
     626364,
     626162,
     313233,
     null
   ]
   ```
   
   Am I handling these type incorrectly in the implementation? (These are without the `DCHECK_EQ`)
   What am I missing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r820456106



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,317 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto* groups = batch[1].array()->GetValues<uint32_t>(1);
+    const auto* values = batch[0].array()->GetValues<CType>(1);
+    const auto* values_bitmap = batch[0].array()->GetValues<uint8_t>(0);
+    int64_t num_values = batch[1].array()->length;
+
+    num_args_ += num_values;
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+    RETURN_NOT_OK(values_.Append(values, num_values));
+
+    if (values_bitmap == nullptr) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      RETURN_NOT_OK(values_bitmap_.Reserve(num_values));
+      values_bitmap_.UnsafeAppend(values_bitmap, 0, num_values);
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }

Review comment:
       Did give `TransposeInts` a try, but it needed raw `uint8_t*` pointers. So if using it is preferred, how would one retrieve the raw pointers from TypedBufferBuilder, as doing something like:
   ```
   reinterpret_cast<uint8_t*>(groups_.data());
   ```
   throws this error:
   ```
   Reinterpret_cast from 'const unsigned int *' to 'uint8_t *' (aka 'unsigned char *') casts away qualifiers
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] dhruv9vats commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

Posted by GitBox <gi...@apache.org>.
dhruv9vats commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r825937523



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));

Review comment:
       This is what using `CopyData` might look like (be it then refactoring it into a helper). Also, `CopyData` does not support decimal types.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(

Review comment:
       Is templating like this okay?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2773,438 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    has_nulls_ = false;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    int64_t offset = values_array_data->offset;
+    const uint8_t* values = values_array_data->buffers[1]->data();
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values));
+    //    RETURN_NOT_OK(values_.Resize(num_args_ + num_values));
+    //    CopyDataUtils<Type>::CopyData(*batch[0].type(), *values_array_data, offset,
+    //                                  values_.bytes_builder()->mutable_data(),
+    //                                  values_.length(), num_values);
+    //    values_.bytes_builder()->UnsafeAdvance(num_values * sizeof(CType));
+
+    if (batch[0].null_count() > 0) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    }
+    num_args_ += num_values;
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    const auto* other_raw_groups = other->groups_.data();
+    const auto* g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+    }
+
+    const uint8_t* values = reinterpret_cast<const uint8_t*>(other->values_.data());
+    RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, 0, other->num_args_));
+
+    if (other->has_nulls_) {
+      if (!has_nulls_) {
+        has_nulls_ = true;
+        RETURN_NOT_OK(values_bitmap_.Append(num_args_, true));
+      }
+      const uint8_t* values_bitmap = other->values_bitmap_.data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, 0, other->num_args_));
+    } else if (has_nulls_) {
+      RETURN_NOT_OK(values_bitmap_.Append(other->num_args_, true));
+    }
+    num_args_ += other->num_args_;
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, groups_buffer);
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_,
+        {has_nulls_ ? std::move(null_bitmap_buffer) : nullptr, std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  bool has_nulls_ = false;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx_->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+    int64_t offset = values_array_data->offset;
+
+    const auto groups_array_data = batch[1].array();
+    DCHECK_EQ(groups_array_data->offset, 0);
+    const auto* groups = groups_array_data->GetValues<uint32_t>(1, 0);
+    RETURN_NOT_OK(groups_.Append(groups, num_values));
+
+    if (batch[0].null_count() == 0) {
+      RETURN_NOT_OK(values_bitmap_.Append(num_values, true));
+    } else {
+      const uint8_t* values_bitmap = values_array_data->buffers[0]->data();
+      RETURN_NOT_OK(GroupedValueTraits<BooleanType>::AppendBuffers(
+          &values_bitmap_, values_bitmap, offset, num_values));
+    }
+    return AppendBuffers<Type>(batch);
+  }
+
+  template <typename T = Type>
+  enable_if_t<is_base_binary_type<T>::value, Status> AppendBuffers(
+      const ExecBatch& batch) {
+    int64_t num_values = batch[0].array()->length;
+    num_args_ += num_values;
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(StringType(val.data(), val.size(), allocator_));
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back("");
+          return Status::OK();
+        });
+  }
+
+  template <typename T = Type>
+  enable_if_t<std::is_same<T, FixedSizeBinaryType>::value, Status> AppendBuffers(
+      const ExecBatch& batch) {
+    const auto values_array_data = batch[0].array();
+    int64_t num_values = values_array_data->length;
+
+    std::string str(
+        checked_cast<const FixedSizeBinaryType&>(*batch[0].type()).byte_width(), '0');
+    StringType s(str.data(), str.size(), allocator_);
+
+    //    values_.reserve(num_values * checked_cast<const
+    //    FixedSizeBinaryType&>(*batch[0].type()).byte_width());
+    values_.resize(values_.size() + num_values, s);
+    CopyDataUtils<FixedSizeBinaryType>::CopyData(

Review comment:
       I don't seem to be getting very far with `FixedBinaryType` and `CopyData`. How to reserve the correct amount of space in the `std::vector`? `resize` seems to overwrite the copied values sometimes. How to make `reserve` work for `util::optional<StringType>`? Do these offsets make sense?
   https://github.com/apache/arrow/blob/ece0e23f1fd5a2ab403e4a645ed01a9c257c0260/cpp/src/arrow/compute/kernels/copy_data_internal.h#L70-L82




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org