You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/25 15:13:19 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #10994: ARROW-13737: [C++] Support for grouped aggregation over scalar columns

lidavidm commented on a change in pull request #10994:
URL: https://github.com/apache/arrow/pull/10994#discussion_r695853274



##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate_test.cc
##########
@@ -1032,6 +1169,55 @@ TEST(GroupBy, TDigest) {
       /*verbose=*/true);
 }
 
+TEST(GroupBy, StddevVarianceTDigestScalar) {
+  BatchesWithSchema input;
+  input.batches = {
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[1, 1.0, 1], [1, 1.0, 1], [1, 1.0, 2], [1, 1.0, 3]]"),
+      ExecBatchFromJSON(
+          {ValueDescr::Scalar(int32()), ValueDescr::Scalar(float32()), int64()},
+          "[[null, null, 1], [null, null, 1], [null, null, 2], [null, null, 3]]"),
+      ExecBatchFromJSON({int32(), float32(), int64()},
+                        "[[2, 2.0, 1], [3, 3.0, 2], [4, 4.0, 3]]"),
+  };
+  input.schema = schema(
+      {field("argument", int32()), field("argument1", float32()), field("key", int64())});
+
+  for (bool use_threads : {false}) {
+    SCOPED_TRACE(use_threads ? "parallel/merged" : "serial");
+    ASSERT_OK_AND_ASSIGN(Datum actual,
+                         GroupByUsingExecPlan(input, {"key"},
+                                              {"argument", "argument", "argument",
+                                               "argument1", "argument1", "argument1"},
+                                              {
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                                  {"hash_stddev", nullptr},
+                                                  {"hash_variance", nullptr},
+                                                  {"hash_tdigest", nullptr},
+                                              },
+                                              use_threads, default_exec_context()));
+    Datum expected =
+        ArrayFromJSON(struct_({
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("hash_stddev", float64()),
+                          field("hash_variance", float64()),
+                          field("hash_tdigest", fixed_size_list(float64(), 1)),
+                          field("key", int64()),
+                      }),
+                      R"([
+         [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
+         [1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0], 2],
+         [1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0], 3]

Review comment:
       Oddly enough, that is what our tdigest implementation gives, even in the non-grouped case.
   
   ```
   >>> import pyarrow as pa
   >>> import pyarrow.compute as pc
   >>> pa.__version__
   '5.0.0'
   >>> pc.tdigest([1, 1, 2])
   <pyarrow.lib.DoubleArray object at 0x7f916b5bee80>
   [
     1
   ]
   >>> pc.tdigest([1, 4])
   <pyarrow.lib.DoubleArray object at 0x7f916b5d90a0>
   [
     1
   ]
   >>> pc.tdigest([1, 3])
   <pyarrow.lib.DoubleArray object at 0x7f916b5bee80>
   [
     1
   ]
   ```

##########
File path: cpp/src/arrow/compute/kernels/hash_aggregate.cc
##########
@@ -1138,6 +1160,22 @@ struct GroupedProductImpl final
     return Status::OK();
   }
 
+  static Status Consume(const Scalar& value, const int64_t count, c_type* reduced,
+                        int64_t* counts, uint8_t* no_nulls, const uint32_t* g) {
+    if (value.is_valid) {
+      const auto v = to_unsigned(static_cast<c_type>(UnboxScalar<Type>::Unbox(value)));
+      for (int i = 0; i < count; i++) {
+        reduced[*g] = static_cast<c_type>(to_unsigned(reduced[*g]) * v);

Review comment:
       Yes, this is a bad merge on my part - I will fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org