You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/23 14:09:06 UTC

[GitHub] [arrow] lidavidm opened a new pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

lidavidm opened a new pull request #10792:
URL: https://github.com/apache/arrow/pull/10792


   Note these don't use pairwise summation and so may be prone to precision issues.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-890687071


   From conbench it looks like the changes generally don't affect the performance of the existing variance/stddev kernels except for maybe the int32 variance case (likely a fluke?).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676614903



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567

Review comment:
       Should there be the same comment in `GroupedSumImpl`?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -653,6 +653,137 @@ TEST(GroupBy, SumOnly) {
   }
 }
 
+TEST(GroupBy, MeanOnly) {
+  for (bool use_threads : {true, false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+
+    auto table =
+        TableFromJSON(schema({field("argument", float64()), field("key", int64())}), {R"([
+    [1.0,   1],
+    [null,  1]
+                        ])",
+                                                                                      R"([
+    [0.0,   2],
+    [null,  3],
+    [4.0,   null],
+    [3.25,  1],
+    [0.125, 2]
+                        ])",
+                                                                                      R"([
+    [-0.25, 2],
+    [0.75,  null],
+    [null,  3]
+                        ])"});
+
+    ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                         internal::GroupBy({table->GetColumnByName("argument")},
+                                           {table->GetColumnByName("key")},
+                                           {
+                                               {"hash_mean", nullptr},
+                                           },
+                                           use_threads));
+    SortBy({"key_0"}, &aggregated_and_grouped);
+
+    AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                              field("hash_mean", float64()),
+                                              field("key_0", int64()),
+                                          }),
+                                          R"([
+    [2.125,   1],
+    [-0.041666666666666664, 2],
+    [null,   3],
+    [2.375,   null]
+  ])"),
+                            aggregated_and_grouped,
+                            /*verbose=*/true);
+  }
+}
+
+TEST(GroupBy, VarianceAndStddev) {
+  auto batch = RecordBatchFromJSON(
+      schema({field("argument", int32()), field("key", int64())}), R"([
+    [1,   1],
+    [null,  1],
+    [0,   2],
+    [null,  3],
+    [4,   null],
+    [3,  1],
+    [0, 2],
+    [-1, 2],
+    [1,  null],
+    [null,  3]
+  ])");
+
+  ASSERT_OK_AND_ASSIGN(Datum aggregated_and_grouped,
+                       internal::GroupBy(
+                           {
+                               batch->GetColumnByName("argument"),
+                               batch->GetColumnByName("argument"),
+                           },
+                           {
+                               batch->GetColumnByName("key"),
+                           },
+                           {
+                               {"hash_variance", nullptr},
+                               {"hash_stddev", nullptr},
+                           }));
+
+  AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("hash_variance", float64()),
+                                            field("hash_stddev", float64()),
+                                            field("key_0", int64()),
+                                        }),
+                                        R"([
+    [1.0,                 1.0,                1],
+    [0.22222222222222224, 0.4714045207910317, 2],
+    [null,                null,               3],
+    [2.25,                1.5,                null]
+  ])"),
+                          aggregated_and_grouped,
+                          /*verbose=*/true);
+
+  batch = RecordBatchFromJSON(
+      schema({field("argument", float64()), field("key", int64())}), R"([
+    [1.0,   1],
+    [null,  1],
+    [0.0,   2],
+    [null,  3],
+    [4.0,   null],
+    [3.0,  1],
+    [0.0, 2],
+    [-1.0, 2],
+    [1.0,  null],
+    [null,  3]
+  ])");
+
+  ASSERT_OK_AND_ASSIGN(aggregated_and_grouped, internal::GroupBy(
+                                                   {
+                                                       batch->GetColumnByName("argument"),
+                                                       batch->GetColumnByName("argument"),
+                                                   },
+                                                   {
+                                                       batch->GetColumnByName("key"),
+                                                   },
+                                                   {
+                                                       {"hash_variance", nullptr},
+                                                       {"hash_stddev", nullptr},
+                                                   }));
+
+  AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("hash_variance", float64()),
+                                            field("hash_stddev", float64()),
+                                            field("key_0", int64()),
+                                        }),
+                                        R"([
+    [1.0,                 1.0,                1],
+    [0.22222222222222224, 0.4714045207910317, 2],
+    [null,                null,               3],
+    [2.25,                1.5,                null]
+  ])"),
+                          aggregated_and_grouped,
+                          /*verbose=*/true);
+}
+

Review comment:
       These tests are unfortunately verbose, but perhaps you could add nevertheless another test with a different `ddof` (and one group with an insufficient number of non-null values)?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       A pity this is duplicating the existing math from the scalar aggregate kernel.
   
   How would you feel about factoring the underlying math in a simple `VarStdOp<ArrowType>` that you would feed values to? You would have one `VarStdOp` in the scalar aggregate kernel and `num_groups_` ones in the hash aggregate kernel.
   
   That might be a bit different performance-wise because you would have an array-of-structures `std::vector<VarStdOp>` rather than a structure-of-arrays of the current three `BufferBuilder`, but I'm not sure it's really important here.
   
   OTOH, `Consume` would not really benefit because the scalar aggregate kernel uses pairwise-summation for floating-point input.
   
   cc @bkietz for advice

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }
+    return Status::OK();
+  }
+
+  Result<Datum> Finalize() override {
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    double* results = reinterpret_cast<double*>(values->mutable_data());
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const double* m2s = reinterpret_cast<const double*>(m2s_.data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > options_.ddof) {
+        const double variance = m2s[i] / (counts[i] - options_.ddof);
+        results[i] = result_type_ == VarOrStd::Var ? variance : std::sqrt(variance);
+        continue;
+      }
+
+      results[i] = 0;
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  VarOrStd result_type_;
+  VarianceOptions options_;
+  int64_t num_groups_ = 0;
+  // m2 = count * s2 = sum((X-mean)^2)
+  BufferBuilder counts_, means_, m2s_;
+  ExecContext* ctx_;
+  MemoryPool* pool_;
+};
+
+template <typename T, VarOrStd result_type>
+Result<std::unique_ptr<KernelState>> VarStdInit(KernelContext* ctx,
+                                                const KernelInitArgs& args) {
+  auto impl = ::arrow::internal::make_unique<GroupedVarStdImpl<T>>();
+  impl->result_type_ = result_type;
+  RETURN_NOT_OK(impl->Init(ctx->exec_context(), args.options));
+  return std::move(impl);
+}
+
+template <VarOrStd result_type>
+struct GroupedVarStdFactory {
+  template <typename T, typename Enable = enable_if_t<is_integer_type<T>::value ||
+                                                      is_floating_type<T>::value>>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), VarStdInit<T, result_type>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Summing data of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Summing data of type ", type);

Review comment:
       The error message should be updated for this operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-889041516


   Benchmark runs are scheduled for baseline = 31b60f3a2319cc628fa8c692d441f94fb32322fe and contender = 90c91fa6cdc7c32bf00dd2e88cba27c79dbab717. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Provided benchmark filters do not have any benchmark groups to be executed on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/7a89ec47dd834ea882321fa62be177c5...19dd1f3d74554b8ea67238a094890834/)
   [Skipped :warning: Only ['Python', 'R', 'JavaScript'] langs are supported on ursa-i9-9960x] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/48503bd2b71344a997208fc917431628...5f0704b53eda41ac87f2f2bef0cbdb56/)
   [Scheduled] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/8a465ec2c3c94f44b11a78566dd93734...21f4c3398c254bbdad1f04e4dff64748/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R, JavaScript
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676687330



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       Another option that Ben's mentioned would be to treat scalar aggregation as a hash aggregation with one group, though then we should immediately tackle the pairwise summation issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-889041262


   @ursabot please benchmark lang=C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-885665761


   https://issues.apache.org/jira/browse/ARROW-13295


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-891063486


   It's difficult to navigate the conbench results, but the results don't look like concerning in any way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676645072



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       Also @cyb70289 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676583689



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);

Review comment:
       Should it be `static_cast<double>(sums[i]) / counts[i]`?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());

Review comment:
       Will these buffers always 8 bytes aligned?
   Not a real issue. Just to satisfy ubsan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676693009



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());

Review comment:
       These use BufferBuilder which defer to the allocator, which should give us aligned allocations (and the builder additionally pads allocations).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r677526124



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       I'll try refactoring it instead and compare the performance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-889041516


   Benchmark runs are scheduled for baseline = 31b60f3a2319cc628fa8c692d441f94fb32322fe and contender = 90c91fa6cdc7c32bf00dd2e88cba27c79dbab717. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Provided benchmark filters do not have any benchmark groups to be executed on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/7a89ec47dd834ea882321fa62be177c5...19dd1f3d74554b8ea67238a094890834/)
   [Skipped :warning: Only ['Python', 'R', 'JavaScript'] langs are supported on ursa-i9-9960x] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/48503bd2b71344a997208fc917431628...5f0704b53eda41ac87f2f2bef0cbdb56/)
   [Finished :arrow_down:0.67% :arrow_up:0.05%] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/8a465ec2c3c94f44b11a78566dd93734...21f4c3398c254bbdad1f04e4dff64748/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R, JavaScript
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r677453068



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       It's unfortunately not very good. See #10813.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-887625130


   Benchmark runs are scheduled for baseline = 31b60f3a2319cc628fa8c692d441f94fb32322fe and contender = df994626e0e6186edb48039e5f768b445986d13c. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Provided benchmark filters do not have any benchmark groups to be executed on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/7a89ec47dd834ea882321fa62be177c5...691bcfcbd03643afaf7c670dc34c3456/)
   [Skipped :warning: Only ['Python', 'R', 'JavaScript'] langs are supported on ursa-i9-9960x] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/48503bd2b71344a997208fc917431628...b95bb718815e414cad0642e0800a44da/)
   [Failed] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/8a465ec2c3c94f44b11a78566dd93734...550b0f83a9a74ad4b45d9f5406c6d9a1/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R, JavaScript
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10792:
URL: https://github.com/apache/arrow/pull/10792


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-887624968


   @ursabot please benchmark lang=C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676913833



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       I'll see if I can get a comparison up before I'm out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-891063486


   It's difficult to navigate the conbench results, but the results don't seem concerning in any way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#discussion_r676867859



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1005,6 +1007,325 @@ struct GroupedSumFactory {
   InputType argument_type;
 };
 
+// ----------------------------------------------------------------------
+// Mean implementation
+
+template <typename Type>
+struct GroupedMeanImpl : public GroupedSumImpl<Type> {
+  Result<Datum> Finalize() override {
+    using SumType = typename GroupedSumImpl<Type>::SumType;
+    std::shared_ptr<Buffer> null_bitmap;
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> values,
+                          AllocateBuffer(num_groups_ * sizeof(double), pool_));
+    int64_t null_count = 0;
+
+    const int64_t* counts = reinterpret_cast<const int64_t*>(counts_.data());
+    const auto* sums = reinterpret_cast<const SumType*>(sums_.data());
+    double* means = reinterpret_cast<double*>(values->mutable_data());
+    for (int64_t i = 0; i < num_groups_; ++i) {
+      if (counts[i] > 0) {
+        means[i] = static_cast<double>(sums[i] / counts[i]);
+        continue;
+      }
+      means[i] = 0;
+
+      if (null_bitmap == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(null_bitmap, AllocateBitmap(num_groups_, pool_));
+        BitUtil::SetBitsTo(null_bitmap->mutable_data(), 0, num_groups_, true);
+      }
+
+      null_count += 1;
+      BitUtil::SetBitTo(null_bitmap->mutable_data(), i, false);
+    }
+
+    return ArrayData::Make(float64(), num_groups_,
+                           {std::move(null_bitmap), std::move(values)}, null_count);
+  }
+
+  std::shared_ptr<DataType> out_type() const override { return float64(); }
+
+  using GroupedSumImpl<Type>::num_groups_;
+  using GroupedSumImpl<Type>::pool_;
+  using GroupedSumImpl<Type>::counts_;
+  using GroupedSumImpl<Type>::sums_;
+};
+
+struct GroupedMeanFactory {
+  template <typename T, typename AccType = typename FindAccumulatorType<T>::Type>
+  Status Visit(const T&) {
+    kernel = MakeKernel(std::move(argument_type), HashAggregateInit<GroupedMeanImpl<T>>);
+    return Status::OK();
+  }
+
+  Status Visit(const HalfFloatType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  Status Visit(const DataType& type) {
+    return Status::NotImplemented("Computing mean of type ", type);
+  }
+
+  static Result<HashAggregateKernel> Make(const std::shared_ptr<DataType>& type) {
+    GroupedMeanFactory factory;
+    factory.argument_type = InputType::Array(type);
+    RETURN_NOT_OK(VisitTypeInline(*type, &factory));
+    return std::move(factory.kernel);
+  }
+
+  HashAggregateKernel kernel;
+  InputType argument_type;
+};
+
+// Variance/Stdev implementation
+
+using arrow::internal::int128_t;
+
+template <typename Type>
+struct GroupedVarStdImpl : public GroupedAggregator {
+  using CType = typename Type::c_type;
+
+  Status Init(ExecContext* ctx, const FunctionOptions* options) override {
+    options_ = *checked_cast<const VarianceOptions*>(options);
+    ctx_ = ctx;
+    pool_ = ctx->memory_pool();
+    counts_ = BufferBuilder(pool_);
+    means_ = BufferBuilder(pool_);
+    m2s_ = BufferBuilder(pool_);
+    return Status::OK();
+  }
+
+  Status Resize(int64_t new_num_groups) override {
+    auto added_groups = new_num_groups - num_groups_;
+    num_groups_ = new_num_groups;
+    RETURN_NOT_OK(counts_.Append(added_groups * sizeof(int64_t), 0));
+    RETURN_NOT_OK(means_.Append(added_groups * sizeof(double), 0));
+    RETURN_NOT_OK(m2s_.Append(added_groups * sizeof(double), 0));
+    return Status::OK();
+  }
+
+  Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); }
+
+  // float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
+  // (see aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    using SumType =
+        typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
+
+    int64_t* counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    double* means = reinterpret_cast<double*>(means_.mutable_data());
+    double* m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    // XXX this uses naive summation; we should switch to pairwise summation as was
+    // done for the scalar aggregate kernel in ARROW-11567
+    std::vector<SumType> sums(num_groups_);
+    auto g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          sums[*g] += value;
+          counts[*g] += 1;
+          ++g;
+        },
+        [&] { ++g; });
+
+    for (int64_t i = 0; i < num_groups_; i++) {
+      means[i] = static_cast<double>(sums[i]) / counts[i];
+    }
+
+    g = batch[1].array()->GetValues<uint32_t>(1);
+    VisitArrayDataInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType value) {
+          const double v = static_cast<double>(value);
+          m2s[*g] += (v - means[*g]) * (v - means[*g]);
+          ++g;
+        },
+        [&] { ++g; });
+
+    return Status::OK();
+  }
+
+  // int32/16/8: textbook one pass algorithm with integer arithmetic (see
+  // aggregate_var_std.cc)
+  template <typename T = Type>
+  enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4), Status> ConsumeImpl(
+      const ExecBatch& batch) {
+    // max number of elements that sum will not overflow int64 (2Gi int32 elements)
+    // for uint32:    0 <= sum < 2^63 (int64 >= 0)
+    // for int32: -2^62 <= sum < 2^62
+    constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8);
+
+    const auto& array = *batch[0].array();
+    const auto g = batch[1].array()->GetValues<uint32_t>(1);
+
+    std::vector<int64_t> sum(num_groups_);
+    std::vector<int128_t> square_sum(num_groups_);
+
+    ARROW_ASSIGN_OR_RAISE(auto mapping,
+                          AllocateBuffer(num_groups_ * sizeof(uint32_t), pool_));
+    for (uint32_t i = 0; static_cast<int64_t>(i) < num_groups_; i++) {
+      reinterpret_cast<uint32_t*>(mapping->mutable_data())[i] = i;
+    }
+    ArrayData group_id_mapping(uint32(), num_groups_, {nullptr, std::move(mapping)},
+                               /*null_count=*/0);
+
+    const CType* values = array.GetValues<CType>(1);
+
+    for (int64_t start_index = 0; start_index < batch.length; start_index += max_length) {
+      // process in chunks that overflow will never happen
+
+      // reset state
+      std::fill(sum.begin(), sum.end(), 0);
+      std::fill(square_sum.begin(), square_sum.end(), 0);
+      GroupedVarStdImpl<Type> state;
+      RETURN_NOT_OK(state.Init(ctx_, &options_));
+      RETURN_NOT_OK(state.Resize(num_groups_));
+      int64_t* other_counts = reinterpret_cast<int64_t*>(state.counts_.mutable_data());
+      double* other_means = reinterpret_cast<double*>(state.means_.mutable_data());
+      double* other_m2s = reinterpret_cast<double*>(state.m2s_.mutable_data());
+
+      arrow::internal::VisitSetBitRunsVoid(
+          array.buffers[0], array.offset + start_index,
+          std::min(max_length, batch.length - start_index),
+          [&](int64_t pos, int64_t len) {
+            for (int64_t i = 0; i < len; ++i) {
+              const int64_t index = start_index + pos + i;
+              const auto value = values[index];
+              sum[g[index]] += value;
+              square_sum[g[index]] += static_cast<uint64_t>(value) * value;
+              other_counts[g[index]]++;
+            }
+          });
+
+      for (int64_t i = 0; i < num_groups_; i++) {
+        if (other_counts[i] == 0) continue;
+
+        const double mean = static_cast<double>(sum[i]) / other_counts[i];
+        // calculate m2 = square_sum - sum * sum / count
+        // decompose `sum * sum / count` into integers and fractions
+        const int128_t sum_square = static_cast<int128_t>(sum[i]) * sum[i];
+        const int128_t integers = sum_square / other_counts[i];
+        const double fractions =
+            static_cast<double>(sum_square % other_counts[i]) / other_counts[i];
+        const double m2 = static_cast<double>(square_sum[i] - integers) - fractions;
+
+        other_means[i] = mean;
+        other_m2s[i] = m2;
+      }
+      RETURN_NOT_OK(this->Merge(std::move(state), group_id_mapping));
+    }
+    return Status::OK();
+  }
+
+  Status Merge(GroupedAggregator&& raw_other,
+               const ArrayData& group_id_mapping) override {
+    // Combine m2 from two chunks (see aggregate_var_std.cc)
+    auto other = checked_cast<GroupedVarStdImpl*>(&raw_other);
+
+    auto counts = reinterpret_cast<int64_t*>(counts_.mutable_data());
+    auto means = reinterpret_cast<double*>(means_.mutable_data());
+    auto m2s = reinterpret_cast<double*>(m2s_.mutable_data());
+
+    const auto* other_counts = reinterpret_cast<const int64_t*>(other->counts_.data());
+    const auto* other_means = reinterpret_cast<const double*>(other->means_.data());
+    const auto* other_m2s = reinterpret_cast<const double*>(other->m2s_.data());
+
+    auto g = group_id_mapping.GetValues<uint32_t>(1);
+    for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
+      if (other_counts[other_g] == 0) continue;
+      const double mean =
+          (means[*g] * counts[*g] + other_means[other_g] * other_counts[other_g]) /
+          (counts[*g] + other_counts[other_g]);
+      m2s[*g] += other_m2s[other_g] +
+                 counts[*g] * (means[*g] - mean) * (means[*g] - mean) +
+                 other_counts[other_g] * (other_means[other_g] - mean) *
+                     (other_means[other_g] - mean);
+      counts[*g] += other_counts[other_g];
+      means[*g] = mean;
+    }

Review comment:
       https://issues.apache.org/jira/browse/ARROW-13451




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #10792: ARROW-13295: [C++] add hash_mean, hash_variance, hash_stddev kernels

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #10792:
URL: https://github.com/apache/arrow/pull/10792#issuecomment-887625130


   Benchmark runs are scheduled for baseline = 31b60f3a2319cc628fa8c692d441f94fb32322fe and contender = df994626e0e6186edb48039e5f768b445986d13c. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Provided benchmark filters do not have any benchmark groups to be executed on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2 (mimalloc)](https://conbench.ursa.dev/compare/runs/7a89ec47dd834ea882321fa62be177c5...691bcfcbd03643afaf7c670dc34c3456/)
   [Skipped :warning: Only ['Python', 'R', 'JavaScript'] langs are supported on ursa-i9-9960x] [ursa-i9-9960x (mimalloc)](https://conbench.ursa.dev/compare/runs/48503bd2b71344a997208fc917431628...b95bb718815e414cad0642e0800a44da/)
   [Scheduled] [ursa-thinkcentre-m75q (mimalloc)](https://conbench.ursa.dev/compare/runs/8a465ec2c3c94f44b11a78566dd93734...550b0f83a9a74ad4b45d9f5406c6d9a1/)
   Supported benchmarks:
   ursa-i9-9960x: langs = Python, R, JavaScript
   ursa-thinkcentre-m75q: langs = C++, Java
   ec2-t3-xlarge-us-east-2: cloud = True
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org