You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/24 22:05:59 UTC

[GitHub] [arrow] lidavidm opened a new pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

lidavidm opened a new pull request #10994:
URL: https://github.com/apache/arrow/pull/10994


   Also fixes a major bug in grouped var/std, where multiple batches fed to the same state instance would improperly update state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#issuecomment-905008327


   https://issues.apache.org/jira/browse/ARROW-13737


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson closed pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
nealrichardson closed pull request #10994:
URL: https://github.com/apache/arrow/pull/10994


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r696849172



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1032,6 +1169,55 @@ TEST(GroupBy, TDigest) {
       /*verbose=*/true);
 }
 
+TEST(GroupBy, StddevVarianceTDigestScalar) {
+  BatchesWithSchema input;
+  input.batches = {
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[1, 1.0, 1], [1, 1.0, 1], [1, 1.0, 2], [1, 1.0, 3]]"),
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[null, null, 1], [null, null, 1], [null, null, 2], [null, null, 3]]"),
+      ExecBatchFromJSON({int32(), float32(), int64()},
+                        "[[2, 2.0, 1], [3, 3.0, 2], [4, 4.0, 3]]"),
+  };
+  input.schema = schema(
+      {field("argument", int32()), field("argument1", float32()), field("key", int64())});
+
+  for (bool use_threads : {false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {"argument", "argument", "argument",
+                                               "argument1", "argument1", "argument1"},
+                                              {
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                              },
+                                              use_threads, default_exec_context()));
+    Datum expected =
+        ArrayFromJSON(struct_({
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("key", int64()),
+                      }),
+                      R"([
+         [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
+         [1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0], 2],
+         [1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0], 3]

Review comment:
       Ah, too bad.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] felipeblazing commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
felipeblazing commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695975190



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -933,31 +967,36 @@ struct GroupedReducingAggregator : public GroupedAggregator {
   }
 
   Status Consume(const ExecBatch& batch) override {
-    c_type* reduced = reduced_.mutable_data();
+    CType* reduced = reduced_.mutable_data();
     int64_t* counts = counts_.mutable_data();
     uint8_t* no_nulls = no_nulls_.mutable_data();
 
-    auto g = batch[1].array()->GetValues<uint32_t>(1);
-
-    return Impl::Consume(*batch[0].array(), reduced, counts, no_nulls, g);
+    VisitGroupedValues<Type>(
+        batch,
+        [&](uint32_t g, InputCType value) {

Review comment:
       can we also capture explicitly here?

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1833,30 +1853,45 @@ struct GroupedBooleanAggregator : public GroupedAggregator {
     uint8_t* seen = seen_.mutable_data();
     uint8_t* no_nulls = no_nulls_.mutable_data();
     int64_t* counts = counts_.mutable_data();
-    const auto& input = *batch[0].array();
     auto g = batch[1].array()->GetValues<uint32_t>(1);
 
-    if (input.MayHaveNulls()) {
-      const uint8_t* bitmap = input.buffers[1]->data();
-      arrow::internal::VisitBitBlocksVoid(
-          input.buffers[0], input.offset, input.length,
-          [&](int64_t position) {
-            counts[*g]++;
-            Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
-            g++;
-          },
-          [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+    if (batch[0].is_array()) {
+      const auto& input = *batch[0].array();
+      if (input.MayHaveNulls()) {
+        const uint8_t* bitmap = input.buffers[1]->data();
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[0], input.offset, input.length,
+            [&](int64_t position) {
+              counts[*g]++;
+              Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
+              g++;
+            },
+            [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+      } else {
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[1], input.offset, input.length,
+            [&](int64_t) {
+              Impl::UpdateGroupWith(seen, *g, true);

Review comment:
       Just a question but could you explain briefly what seen's purpose is here? 

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -824,6 +824,36 @@ Status AddHashAggKernels(
   return Status::OK();
 }
 
+template <typename Type, typename ConsumeValue, typename ConsumeNull>
+void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+                        ConsumeNull&& null_func) {
+  auto g = batch[1].array()->GetValues<uint32_t>(1);
+  if (batch[0].is_array()) {
+    VisitArrayValuesInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType val) { valid_func(*g++, val); },

Review comment:
       Is there a reason we don't want to be explicit in our capture here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695998614



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1833,30 +1853,45 @@ struct GroupedBooleanAggregator : public GroupedAggregator {
     uint8_t* seen = seen_.mutable_data();
     uint8_t* no_nulls = no_nulls_.mutable_data();
     int64_t* counts = counts_.mutable_data();
-    const auto& input = *batch[0].array();
     auto g = batch[1].array()->GetValues<uint32_t>(1);
 
-    if (input.MayHaveNulls()) {
-      const uint8_t* bitmap = input.buffers[1]->data();
-      arrow::internal::VisitBitBlocksVoid(
-          input.buffers[0], input.offset, input.length,
-          [&](int64_t position) {
-            counts[*g]++;
-            Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
-            g++;
-          },
-          [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+    if (batch[0].is_array()) {
+      const auto& input = *batch[0].array();
+      if (input.MayHaveNulls()) {
+        const uint8_t* bitmap = input.buffers[1]->data();
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[0], input.offset, input.length,
+            [&](int64_t position) {
+              counts[*g]++;
+              Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
+              g++;
+            },
+            [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+      } else {
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[1], input.offset, input.length,
+            [&](int64_t) {
+              Impl::UpdateGroupWith(seen, *g, true);

Review comment:
       `seen` is perhaps a poor name (I'll rename it) but it's the current value of the aggregation (i.e. `any_true` or `all_true`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695992451



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -824,6 +824,36 @@ Status AddHashAggKernels(
   return Status::OK();
 }
 
+template <typename Type, typename ConsumeValue, typename ConsumeNull>
+void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+                        ConsumeNull&& null_func) {
+  auto g = batch[1].array()->GetValues<uint32_t>(1);
+  if (batch[0].is_array()) {
+    VisitArrayValuesInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType val) { valid_func(*g++, val); },

Review comment:
       We may capture explicitly when lifetime is at stake, e.g. in async code where it's important to delineate what exactly needs to survive past the enclosing scope. Here, the callable is executed synchronously, listing every capture explicitly is more of a nuisance (both for typing and readability-wise).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695853274



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1032,6 +1169,55 @@ TEST(GroupBy, TDigest) {
       /*verbose=*/true);
 }
 
+TEST(GroupBy, StddevVarianceTDigestScalar) {
+  BatchesWithSchema input;
+  input.batches = {
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[1, 1.0, 1], [1, 1.0, 1], [1, 1.0, 2], [1, 1.0, 3]]"),
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[null, null, 1], [null, null, 1], [null, null, 2], [null, null, 3]]"),
+      ExecBatchFromJSON({int32(), float32(), int64()},
+                        "[[2, 2.0, 1], [3, 3.0, 2], [4, 4.0, 3]]"),
+  };
+  input.schema = schema(
+      {field("argument", int32()), field("argument1", float32()), field("key", int64())});
+
+  for (bool use_threads : {false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {"argument", "argument", "argument",
+                                               "argument1", "argument1", "argument1"},
+                                              {
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                              },
+                                              use_threads, default_exec_context()));
+    Datum expected =
+        ArrayFromJSON(struct_({
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("key", int64()),
+                      }),
+                      R"([
+         [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
+         [1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0], 2],
+         [1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0], 3]

Review comment:
       Oddly enough, that is what our tdigest implementation gives, even in the non-grouped case.
   
   ```
   >>> import pyarrow as pa
   >>> import pyarrow.compute as pc
   >>> pa.__version__
   '5.0.0'
   >>> pc.tdigest([1, 1, 2])
   <pyarrow.lib.DoubleArray object at 0x7f916b5bee80>
   [
     1
   ]
   >>> pc.tdigest([1, 4])
   <pyarrow.lib.DoubleArray object at 0x7f916b5d90a0>
   [
     1
   ]
   >>> pc.tdigest([1, 3])
   <pyarrow.lib.DoubleArray object at 0x7f916b5bee80>
   [
     1
   ]
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1138,6 +1160,22 @@ struct GroupedProductImpl final
     return Status::OK();
   }
 
+  static Status Consume(const Scalar& value, const int64_t count, c_type* reduced,
+                        int64_t* counts, uint8_t* no_nulls, const uint32_t* g) {
+    if (value.is_valid) {
+      const auto v = to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+      for (int i = 0; i < count; i++) {
+        reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) * v);

Review comment:
       Yes, this is a bad merge on my part - I will fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695832254



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1700,14 +1802,30 @@ struct GroupedMinMaxImpl : public GroupedAggregator {
     auto raw_mins = reinterpret_cast<CType*>(mins_.mutable_data());
     auto raw_maxes = reinterpret_cast<CType*>(maxes_.mutable_data());
 
-    VisitArrayValuesInline<Type>(
-        *batch[0].array(),
-        [&](CType val) {
+    if (batch[0].is_array()) {
+      VisitArrayValuesInline<Type>(
+          *batch[0].array(),
+          [&](CType val) {
+            raw_maxes[*g] = std::max(raw_maxes[*g], val);
+            raw_mins[*g] = std::min(raw_mins[*g], val);
+            BitUtil::SetBit(has_values_.mutable_data(), *g++);
+          },
+          [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+    } else {
+      const auto& input = *batch[0].scalar();
+      if (input.is_valid) {
+        const auto val = UnboxScalar<Type>::Unbox(input);
+        for (int64_t i = 0; i < batch.length; i++) {
           raw_maxes[*g] = std::max(raw_maxes[*g], val);
           raw_mins[*g] = std::min(raw_mins[*g], val);
           BitUtil::SetBit(has_values_.mutable_data(), *g++);
-        },
-        [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+        }
+      } else {
+        for (int64_t i = 0; i < batch.length; i++) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), *g++);
+        }
+      }
+    }

Review comment:
       So it seems it would be nicer to write this as:
   ```c++
     auto consume_value = [&](uint32_t g, CType val) {
       raw_maxes[g] = std::max(raw_maxes[g], val);
       raw_mins[g] = std::min(raw_mins[g], val);
       BitUtil::SetBit(has_values_.mutable_data(), g);
     };
     auto consume_null = [&](uint32_t g) {
       BitUtil::SetBit(has_nulls_.mutable_data(), g);
     };
     VisitGroupedValues<Type>(batch, std::move(consume_value), std::move(consume_null));
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r696850366



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -824,6 +824,36 @@ Status AddHashAggKernels(
   return Status::OK();
 }
 
+template <typename Type, typename ConsumeValue, typename ConsumeNull>
+void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+                        ConsumeNull&& null_func) {
+  auto g = batch[1].array()->GetValues<uint32_t>(1);
+  if (batch[0].is_array()) {
+    VisitArrayValuesInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType val) { valid_func(*g++, val); },

Review comment:
       @felipeblazing Out of curiosity, why do you think capturing explicitly would be better here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson closed pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
nealrichardson closed pull request #10994:
URL: https://github.com/apache/arrow/pull/10994


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695892300



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1700,14 +1802,30 @@ struct GroupedMinMaxImpl : public GroupedAggregator {
     auto raw_mins = reinterpret_cast<CType*>(mins_.mutable_data());
     auto raw_maxes = reinterpret_cast<CType*>(maxes_.mutable_data());
 
-    VisitArrayValuesInline<Type>(
-        *batch[0].array(),
-        [&](CType val) {
+    if (batch[0].is_array()) {
+      VisitArrayValuesInline<Type>(
+          *batch[0].array(),
+          [&](CType val) {
+            raw_maxes[*g] = std::max(raw_maxes[*g], val);
+            raw_mins[*g] = std::min(raw_mins[*g], val);
+            BitUtil::SetBit(has_values_.mutable_data(), *g++);
+          },
+          [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+    } else {
+      const auto& input = *batch[0].scalar();
+      if (input.is_valid) {
+        const auto val = UnboxScalar<Type>::Unbox(input);
+        for (int64_t i = 0; i < batch.length; i++) {
           raw_maxes[*g] = std::max(raw_maxes[*g], val);
           raw_mins[*g] = std::min(raw_mins[*g], val);
           BitUtil::SetBit(has_values_.mutable_data(), *g++);
-        },
-        [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+        }
+      } else {
+        for (int64_t i = 0; i < batch.length; i++) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), *g++);
+        }
+      }
+    }

Review comment:
       You gave me hope that a helper like this already existed :( But that is a good idea, it would clean things up quite a bit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695764540



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -206,6 +206,66 @@ Result<Datum> GroupByUsingExecPlan(const std::vector<Datum>& arguments,
                            plan->sources()[0]->outputs()[0]->output_schema()->fields());
 }
 
+Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input,
+                                   const std::vector<std::string>& key_names,
+                                   const std::vector<std::string>& arg_names,
+                                   const std::vector<internal::Aggregate>& aggregates,
+                                   bool use_threads, ExecContext* ctx) {
+  std::vector<FieldRef> keys(key_names.size());
+  std::vector<FieldRef> targets(aggregates.size());
+  std::vector<std::string> names(aggregates.size());
+  for (size_t i = 0; i < aggregates.size(); ++i) {
+    names[i] = aggregates[i].function;
+    targets[i] = FieldRef(arg_names[i]);
+  }
+  for (size_t i = 0; i < key_names.size(); ++i) {
+    keys[i] = FieldRef(key_names[i]);
+  }
+
+  ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(ctx));

Review comment:
       It looks like most of this could be factored out to reuse in both `GroupByUsingExecPlan` overloads.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1547,13 +1641,21 @@ struct GroupedTDigestImpl : public GroupedAggregator {
 
   Status Consume(const ExecBatch& batch) override {
     auto g = batch[1].array()->GetValues<uint32_t>(1);
-    VisitArrayDataInline<Type>(
-        *batch[0].array(),
-        [&](typename TypeTraits<Type>::CType value) {
-          this->tdigests_[*g].NanAdd(value);
-          ++g;
-        },
-        [&] { ++g; });
+    if (batch[0].is_array()) {
+      VisitArrayDataInline<Type>(
+          *batch[0].array(),
+          [&](typename TypeTraits<Type>::CType value) {
+            this->tdigests_[*g].NanAdd(value);
+            ++g;
+          },
+          [&] { ++g; });
+    } else if (batch[0].scalar()->is_valid) {
+      typename TypeTraits<Type>::CType value =
+          UnboxScalar<Type>::Unbox(*batch[0].scalar());
+      for (int64_t i = 0; i < batch.length; i++) {
+        this->tdigests_[*g++].NanAdd(value);
+      }
+    }

Review comment:
       May be nicer if written as:
   ```c++
     auto consume_value = [&](uint32_t g, CType val) {
       this->tdigests_[g].NanAdd(val);
     };
     VisitGroupedNonNullValues<Type>(batch, std::move(consume_value));
   ```
   

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1833,30 +1951,41 @@ struct GroupedBooleanAggregator : public GroupedAggregator {
     uint8_t* seen = seen_.mutable_data();
     uint8_t* no_nulls = no_nulls_.mutable_data();
     int64_t* counts = counts_.mutable_data();
-    const auto& input = *batch[0].array();
     auto g = batch[1].array()->GetValues<uint32_t>(1);
 
-    if (input.MayHaveNulls()) {
-      const uint8_t* bitmap = input.buffers[1]->data();
-      arrow::internal::VisitBitBlocksVoid(
-          input.buffers[0], input.offset, input.length,
-          [&](int64_t position) {
-            counts[*g]++;
-            Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
-            g++;
-          },
-          [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+    if (batch[0].is_array()) {
+      const auto& input = *batch[0].array();
+      if (input.MayHaveNulls()) {
+        const uint8_t* bitmap = input.buffers[1]->data();
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[0], input.offset, input.length,
+            [&](int64_t position) {
+              counts[*g]++;
+              Impl::UpdateGroupWith(seen, *g, BitUtil::GetBit(bitmap, position));
+              g++;
+            },
+            [&] { BitUtil::SetBitTo(no_nulls, *g++, false); });
+      } else {
+        arrow::internal::VisitBitBlocksVoid(
+            input.buffers[1], input.offset, input.length,
+            [&](int64_t) {
+              Impl::UpdateGroupWith(seen, *g, true);
+              counts[*g++]++;
+            },
+            [&]() {
+              Impl::UpdateGroupWith(seen, *g, false);
+              counts[*g++]++;
+            });
+      }
     } else {
-      arrow::internal::VisitBitBlocksVoid(
-          input.buffers[1], input.offset, input.length,
-          [&](int64_t) {
-            Impl::UpdateGroupWith(seen, *g, true);
-            counts[*g++]++;
-          },
-          [&]() {
-            Impl::UpdateGroupWith(seen, *g, false);
-            counts[*g++]++;
-          });
+      const auto& input = *batch[0].scalar();
+      if (input.is_valid) {
+        const bool value = UnboxScalar<BooleanType>::Unbox(input);
+        for (int64_t i = 0; i < batch.length; i++) {
+          Impl::UpdateGroupWith(seen, *g, value);
+          counts[*g++]++;
+        }
+      }

Review comment:
       Shouldn't the non-valid case be handled as well? e.g. a loop doing `BitUtil::SetBitTo(no_nulls, *g++, false)`.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1700,14 +1802,30 @@ struct GroupedMinMaxImpl : public GroupedAggregator {
     auto raw_mins = reinterpret_cast<CType*>(mins_.mutable_data());
     auto raw_maxes = reinterpret_cast<CType*>(maxes_.mutable_data());
 
-    VisitArrayValuesInline<Type>(
-        *batch[0].array(),
-        [&](CType val) {
+    if (batch[0].is_array()) {
+      VisitArrayValuesInline<Type>(
+          *batch[0].array(),
+          [&](CType val) {
+            raw_maxes[*g] = std::max(raw_maxes[*g], val);
+            raw_mins[*g] = std::min(raw_mins[*g], val);
+            BitUtil::SetBit(has_values_.mutable_data(), *g++);
+          },
+          [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+    } else {
+      const auto& input = *batch[0].scalar();
+      if (input.is_valid) {
+        const auto val = UnboxScalar<Type>::Unbox(input);
+        for (int64_t i = 0; i < batch.length; i++) {
           raw_maxes[*g] = std::max(raw_maxes[*g], val);
           raw_mins[*g] = std::min(raw_mins[*g], val);
           BitUtil::SetBit(has_values_.mutable_data(), *g++);
-        },
-        [&] { BitUtil::SetBit(has_nulls_.mutable_data(), *g++); });
+        }
+      } else {
+        for (int64_t i = 0; i < batch.length; i++) {
+          BitUtil::SetBit(has_nulls_.mutable_data(), *g++);
+        }
+      }
+    }

Review comment:
       So it seems it would be nicer to write this as:
   ```c++
     auto consume_value = [&](uint32_t g, CType val) {
       raw_maxes[g] = std::max(raw_maxes[g], val);
       raw_mins[g] = std::min(raw_mins[g], val);
       BitUtil::SetBit(has_values_.mutable_data(), g);
     };
     auto consume_null = [&](uint32_t g) {
       BitUtil::SetBit(has_nulls_.mutable_data(), g);
     };
     VisitGroupedValues<Type>(batch, std::move(consume_value), std::move(consume_nulls));
   ```
   

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1211,6 +1249,22 @@ struct GroupedMeanImpl : public GroupedReducingAggregator<Type, GroupedMeanImpl<
     return Status::OK();
   }
 
+  static Status Consume(const Scalar& value, const int64_t count, c_type* reduced,
+                        int64_t* counts, uint8_t* no_nulls, const uint32_t* g) {
+    if (value.is_valid) {
+      const auto v = to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+      for (int i = 0; i < count; i++) {
+        reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) + v);

Review comment:
       Here as well, it would be nice not to have the `static_cast`/`to_unsigned` dance pasted everywhere.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1138,6 +1160,22 @@ struct GroupedProductImpl final
     return Status::OK();
   }
 
+  static Status Consume(const Scalar& value, const int64_t count, c_type* reduced,
+                        int64_t* counts, uint8_t* no_nulls, const uint32_t* g) {
+    if (value.is_valid) {
+      const auto v = to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+      for (int i = 0; i < count; i++) {
+        reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) * v);

Review comment:
       Why doesn't this use `MultiplyTraits<AccType>::Multiply` instead? It seems like copy/pasting is putting us at risk of latent bugs here. Perhaps you can factor out the raw computation routines?
   ```c++
   using InputCType = typename TypeTraits<Type>::CType;
   
   AccType Reduce(const DataType& type, c_type u, InputCType v) {
     return MultiplyTraits<AccType>::Multiply(type, u, static_cast<c_type>(v));
   }
   
   AccType Reduce(const DataType& type, c_type u, c_type v) {
     return MultiplyTraits<AccType>::Multiply(type, u, v);
   }
   ```
   
   Same for summing:
   ```c++
   using InputCType = typename TypeTraits<Type>::CType;
   
   AccType Reduce(const DataType& type, c_type u, InputCType v) {
     return static_cast<c_type>(to_unsigned(u) + to_unsigned(static_cast<c_type>(v)));
   }
   
   AccType Reduce(const DataType& type, c_type u, c_type v) {
     return static_cast<c_type>(to_unsigned(u) + to_unsigned(v));
   }
   ```
   
   Then you may even reconcile the Sum and Product implementations further.

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1032,6 +1169,55 @@ TEST(GroupBy, TDigest) {
       /*verbose=*/true);
 }
 
+TEST(GroupBy, StddevVarianceTDigestScalar) {
+  BatchesWithSchema input;
+  input.batches = {
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[1, 1.0, 1], [1, 1.0, 1], [1, 1.0, 2], [1, 1.0, 3]]"),
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[null, null, 1], [null, null, 1], [null, null, 2], [null, null, 3]]"),
+      ExecBatchFromJSON({int32(), float32(), int64()},
+                        "[[2, 2.0, 1], [3, 3.0, 2], [4, 4.0, 3]]"),
+  };
+  input.schema = schema(
+      {field("argument", int32()), field("argument1", float32()), field("key", int64())});
+
+  for (bool use_threads : {false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {"argument", "argument", "argument",
+                                               "argument1", "argument1", "argument1"},
+                                              {
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                              },
+                                              use_threads, default_exec_context()));
+    Datum expected =
+        ArrayFromJSON(struct_({
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("key", int64()),
+                      }),
+                      R"([
+         [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
+         [1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0], 2],
+         [1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0], 3]

Review comment:
       tdigest is supposed to compute the median? I would expect 1.0, 2.0 and 2.5 respectively.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695992451



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -824,6 +824,36 @@ Status AddHashAggKernels(
   return Status::OK();
 }
 
+template <typename Type, typename ConsumeValue, typename ConsumeNull>
+void VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func,
+                        ConsumeNull&& null_func) {
+  auto g = batch[1].array()->GetValues<uint32_t>(1);
+  if (batch[0].is_array()) {
+    VisitArrayValuesInline<Type>(
+        *batch[0].array(),
+        [&](typename TypeTraits<Type>::CType val) { valid_func(*g++, val); },

Review comment:
       We may capture explicitly when lifetime is at stake, e.g. an async code where it's important to delineate what exactly needs to survive past the enclosing scope. Here, the callable is just executed synchronously, listing every capture explicitly is more of a nuisance (both for typing and readability-wise).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org