You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/24 13:59:25 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #12484: ARROW-15152: [C++][Compute] Implement hash_list aggregate function

lidavidm commented on a change in pull request #12484:
URL: https://github.com/apache/arrow/pull/12484#discussion_r813900617



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();

Review comment:
       The same goes here: is there a way to just concat the array's buffers with ours? Since we're dealing only with primitive types here?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);

Review comment:
       This is probably as good as it gets

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       Or really: the `else` block is appending _the exact same values already in the other arrays_, so if we can just concatenate the two arrays and avoid the branching, that should presumably be much faster. (At least for values and values_bitmap, groups_ probably still has to be done element-by-element unfortunately. Even then, you could perhaps reserve enough capacity in groups_, then use TransposeInts to do it all in one go.)

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);

Review comment:
       allocator_ is not a parameter of emplace in the first place. emplace/emplace_back forward their arguments so really it's part of the std::basic_string constructor. And hence allocator_ should still work here.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back(util::string_view{});
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       I think here we can just concatenate vectors as with above

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {

Review comment:
       I would assume we want to preserve null values seen in the other group, no? If we do, is there a way to just concat two arrays instead of copying values one-by-one?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -2758,6 +2758,318 @@ struct GroupedOneFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// List implementation
+
+template <typename Type, typename Enable = void>
+struct GroupedListImpl final : public GroupedAggregator {
+  using CType = typename TypeTraits<Type>::CType;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    // out_type_ initialized by GroupedListInit
+    values_ = TypedBufferBuilder<CType>(ctx_->memory_pool());
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, CType val) -> Status {
+          RETURN_NOT_OK(values_.Append(val));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(false));
+          ++num_args_;
+          return Status::OK();
+        });
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    auto other = checked_cast<GroupedListImpl*>(&raw_other);
+    auto other_raw_values = other->values_.mutable_data();
+    uint32_t* other_raw_groups = other->groups_.mutable_data();
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+
+    for (uint32_t other_g = 0; static_cast<int64_t>(other_g) < other->num_args_;
+         ++other_g) {
+      if (bit_util::GetBit(other->values_bitmap_.data(), other_g)) {
+        RETURN_NOT_OK(values_.Append(GetSet::Get(other_raw_values, other_g)));
+        RETURN_NOT_OK(values_bitmap_.Append(true));
+      } else {
+        RETURN_NOT_OK(values_.Append(static_cast<CType>(0)));
+        RETURN_NOT_OK(values_bitmap_.Append(false));
+      }
+      RETURN_NOT_OK(groups_.Append(g[other_raw_groups[other_g]]));
+      ++num_args_;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    ARROW_ASSIGN_OR_RAISE(auto values_buffer, values_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto groups_buffer, groups_.Finish());
+    ARROW_ASSIGN_OR_RAISE(auto null_bitmap_buffer, values_bitmap_.Finish());
+
+    auto groups = UInt32Array(num_args_, std::move(groups_buffer));
+    ARROW_ASSIGN_OR_RAISE(
+        auto groupings,
+        Grouper::MakeGroupings(groups, static_cast<uint32_t>(num_groups_), ctx_));
+
+    auto values_array_data = ArrayData::Make(
+        out_type_, num_args_, {std::move(null_bitmap_buffer), std::move(values_buffer)});
+    auto values = MakeArray(values_array_data);
+    return Grouper::ApplyGroupings(*groupings, *values);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return list(out_type_); }
+
+  ExecContext* ctx_;
+  int64_t num_groups_, num_args_ = 0;
+  TypedBufferBuilder<CType> values_;
+  TypedBufferBuilder<uint32_t> groups_;
+  TypedBufferBuilder<bool> values_bitmap_;
+  std::shared_ptr<DataType> out_type_;
+};
+
+template <typename Type>
+struct GroupedListImpl<Type, enable_if_t<is_base_binary_type<Type>::value ||
+                                         std::is_same<Type, FixedSizeBinaryType>::value>>
+    final : public GroupedAggregator {
+  using Allocator = arrow::stl::allocator<char>;
+  using StringType = std::basic_string<char, std::char_traits<char>, Allocator>;
+  using GetSet = GroupedValueTraits<Type>;
+
+  Status Init(ExecContext* ctx, const std::vector<ValueDescr>&,
+              const FunctionOptions* options) override {
+    ctx_ = ctx;
+    allocator_ = Allocator(ctx->memory_pool());
+    // out_type_ initialized by GroupedListInit
+    groups_ = TypedBufferBuilder<uint32_t>(ctx_->memory_pool());
+    values_bitmap_ = TypedBufferBuilder<bool>(ctx_->memory_pool());
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    num_groups_ = new_num_groups;
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override {
+    return VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t group, util::string_view val) -> Status {
+          values_.emplace_back(val);
+          RETURN_NOT_OK(groups_.Append(group));
+          RETURN_NOT_OK(values_bitmap_.Append(true));
+          ++num_args_;
+          return Status::OK();
+        },
+        [&](uint32_t group) -> Status {
+          values_.emplace_back(util::string_view{});

Review comment:
       Or just "".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org