You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/04/21 13:15:51 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #34912: GH-34911: [C++] Add first and last aggregator

westonpace commented on code in PR #34912:
URL: https://github.com/apache/arrow/pull/34912#discussion_r1173732410


##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -406,7 +414,16 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
     ARROW_ASSIGN_OR_RAISE(
         auto args,
         MakeAggregateNodeArgs(input_schema, keys, segment_keys, aggregates, exec_ctx,
-                              /*concurrency=*/plan->query_context()->max_concurrency()));
+                              /*concurrency=*/concurreny));
+
+    if (concurreny > 1) {

Review Comment:
   Didn't you already test for this in MakeAggregateNodeArgs?



##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -894,6 +954,28 @@ const FunctionDoc mean_doc{
     {"array"},
     "ScalarAggregateOptions"};
 
+const FunctionDoc first_last_doc{"Compute the first and last values of an array",
+                                 ("Do not use this directly. This is internal and might"
+                                  "be removed in the future."),
+                                 {"array"},
+                                 "ScalarAggregateOptions"};
+
+const FunctionDoc first_doc{
+    "Compute the first value in each group",
+    ("Null values are ignored by default.\n"
+     "Currently this should only be used with serial execution because\n"
+     "ordering is otherwise undefined."),

Review Comment:
   I'm not sure this is needed.  This documentation is for someone that wants to use the function.  If they are using the function directly (there are no bindings for this today) then they are presumably passing in all the data at once.  If they are using it through Acero then order is enforced by Acero.



##########
cpp/src/arrow/acero/hash_aggregate_test.cc:
##########
@@ -4206,6 +4206,235 @@ TEST_P(GroupBy, MinMaxWithNewGroupsInChunkedArray) {
                     /*verbose=*/true);
 }
 
+TEST_P(GroupBy, FirstLastBasicTypes) {
+  std::vector<std::shared_ptr<DataType>> types;
+  types.insert(types.end(), boolean());
+  types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());
+  types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end());
+
+  const std::vector<std::string> default_table = {R"([
+    [1,    1],
+    [null, 1]
+])",
+                                                  R"([
+    [0,    2],
+    [null, 3],
+    [3,    4],
+    [5,    4],
+    [4,    null],
+    [3,    1],
+    [0,    2]
+])",
+                                                  R"([
+    [0,    2],
+    [1,    null],
+    [null, 3]
+])"};
+
+  const std::string default_expected =
+      R"([
+    [1,    1,    3,    null,   null],
+    [2,    0,    0,    0,   0],
+    [3,    null,  null,  null,  null],
+    [4,    3,     5,    3,   5],
+    [null, 4,     1,    4,   1]
+    ])";
+
+  const std::vector<std::string> date64_table = {R"([
+    [86400000,    1],
+    [null, 1]
+])",
+                                                 R"([
+    [0,    2],
+    [null, 3],
+    [259200000,    4],
+    [432000000,    4],
+    [345600000,    null],
+    [259200000,    1],
+    [0,    2]
+])",
+                                                 R"([
+    [0,    2],
+    [86400000,    null],
+    [null, 3]
+])"};
+
+  const std::string date64_expected =
+      R"([
+    [1,    86400000,259200000,null,null],
+    [2,    0,0,0,0],
+    [3,    null,null,null,null],
+    [4,    259200000,432000000,259200000,432000000],
+    [null, 345600000,86400000,345600000,86400000]
+    ])";
+
+  const std::vector<std::string> boolean_table = {R"([
+    [true,    1],
+    [null, 1]
+])",
+                                                  R"([
+    [false,    2],
+    [null, 3],
+    [false,    4],
+    [true,    4],
+    [true,    null],
+    [false,    1],
+    [false,    2]
+])",
+                                                  R"([
+    [false,    2],
+    [false,    null],
+    [null, 3]
+])"};
+
+  const std::string boolean_expected =
+      R"([
+    [1,    true,false,null,null],
+    [2,    false,false,false,false],
+    [3,    null,null,null,null],
+    [4,    false,true,false,true],
+    [null, true,false,true,false]
+    ])";
+
+  auto skip_nulls = std::make_shared<ScalarAggregateOptions>(false, 1);

Review Comment:
   ```suggestion
     auto dont_skip_nulls = std::make_shared<ScalarAggregateOptions>(false, 1);
   ```



##########
cpp/src/arrow/compute/function_test.cc:
##########
@@ -323,7 +323,7 @@ TEST(ScalarAggregateFunction, DispatchExact) {
 
   std::vector<InputType> in_args = {int8()};
   ScalarAggregateKernel kernel(std::move(in_args), int64(), NoopInit, NoopConsume,
-                               NoopMerge, NoopFinalize);
+                               NoopMerge, NoopFinalize, /*ordered*/ false);

Review Comment:
   ```suggestion
                                  NoopMerge, NoopFinalize, /*ordered=*/ false);
   ```
   Minor consistency nit



##########
cpp/src/arrow/acero/hash_aggregate_test.cc:
##########
@@ -4206,6 +4206,235 @@ TEST_P(GroupBy, MinMaxWithNewGroupsInChunkedArray) {
                     /*verbose=*/true);
 }
 
+TEST_P(GroupBy, FirstLastBasicTypes) {
+  std::vector<std::shared_ptr<DataType>> types;
+  types.insert(types.end(), boolean());
+  types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());
+  types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end());
+
+  const std::vector<std::string> default_table = {R"([

Review Comment:
   Maybe `numeric_table`?



##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -894,6 +954,28 @@ const FunctionDoc mean_doc{
     {"array"},
     "ScalarAggregateOptions"};
 
+const FunctionDoc first_last_doc{"Compute the first and last values of an array",
+                                 ("Do not use this directly. This is internal and might"
+                                  "be removed in the future."),

Review Comment:
   We can probably remove this warning.  I do believe that "always calculating the first and last in one pass" is not ideal when the user just wants the first or just wants the last (e.g. compared to min/max where you have to iterate the entire array no matter what).
   
   However, I do think there will still be scenarios where a user will want both the first and last in a single pass.  So let's go ahead and assume this function is here to stay.



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +273,120 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
 };
 
 // ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable = void>
+struct FirstLastState {};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastState<ArrowType, SimdLevel, enable_if_floating_point<ArrowType>> {
+  using ThisType = FirstLastState<ArrowType, SimdLevel>;
+  using T = typename ArrowType::c_type;
+  using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
+
+  ThisType& operator+=(const ThisType& rhs) {
+    this->has_nulls |= rhs.has_nulls;
+    this->first = this->first.has_value() ? this->first : rhs.first;
+    this->last = rhs.last.has_value() ? rhs.last : this->last;
+    return *this;
+  }
+
+  void MergeOne(T value) {
+    if (!this->first.has_value()) {
+      this->first = value;
+    }
+    this->last = value;
+  }
+
+  std::optional<T> first = std::nullopt;
+  std::optional<T> last = std::nullopt;
+  bool has_nulls = false;
+};
+
+template <typename ArrowType, SimdLevel::type SimdLevel>
+struct FirstLastImpl : public ScalarAggregator {
+  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
+  using ThisType = FirstLastImpl<ArrowType, SimdLevel>;
+  using StateType = FirstLastState<ArrowType, SimdLevel>;
+
+  FirstLastImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : out_type(std::move(out_type)), options(std::move(options)), count(0) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext*, const ExecSpan& batch) override {
+    if (batch[0].is_array()) {

Review Comment:
   Ah.  The function registry has a bit of an (intentional but perhaps not well documented) quirk.  If all arguments are scalar then the function registry should automatically convert them to arrays.  Perhaps this does not work with aggregate functions though?  You might check with a debugger but I don't think this branch would be hit even if you passed in a scalar input.



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -272,8 +272,230 @@ struct MeanKernelInit : public SumLikeInit<KernelClass> {
 };
 
 // ----------------------------------------------------------------------
-// MinMax implementation
+// Last implementation
+template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable = void>

Review Comment:
   ```suggestion
   template <typename ArrowType, typename Enable = void>
   ```
   As far as I can tell `SimdLevel` is completely unused.



##########
cpp/src/arrow/acero/aggregate_node.cc:
##########
@@ -716,6 +733,15 @@ class GroupByNode : public ExecNode, public TracedNode {
     ARROW_ASSIGN_OR_RAISE(auto args, MakeAggregateNodeArgs(input_schema, keys,
                                                            segment_keys, aggs, exec_ctx));
 
+    if (concurreny > 1) {

Review Comment:
   Ditto.  I think this is tested in MakeAggregateNodeArgs



##########
cpp/src/arrow/acero/hash_aggregate_test.cc:
##########
@@ -4206,6 +4206,235 @@ TEST_P(GroupBy, MinMaxWithNewGroupsInChunkedArray) {
                     /*verbose=*/true);
 }
 
+TEST_P(GroupBy, FirstLastBasicTypes) {
+  std::vector<std::shared_ptr<DataType>> types;
+  types.insert(types.end(), boolean());
+  types.insert(types.end(), NumericTypes().begin(), NumericTypes().end());
+  types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end());
+
+  const std::vector<std::string> default_table = {R"([
+    [1,    1],
+    [null, 1]
+])",
+                                                  R"([
+    [0,    2],
+    [null, 3],
+    [3,    4],
+    [5,    4],
+    [4,    null],
+    [3,    1],
+    [0,    2]
+])",
+                                                  R"([
+    [0,    2],
+    [1,    null],
+    [null, 3]
+])"};
+
+  const std::string default_expected =
+      R"([
+    [1,    1,    3,    null,   null],

Review Comment:
   It seems like you've interpreted `skip_nulls=false` to mean "if a null is present in the source array then set the value to null".
   
   However, shouldn't it be "if null is the first/last in the array then set first/last to null"?
   
   In other words, shouldn't `First(x, skip_nulls=false)` be identical to `x[0]`?



##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -439,6 +440,46 @@ struct ProductInit {
   }
 };
 
+// ----------------------------------------------------------------------
+// FirstLast implementation
+
+Result<std::unique_ptr<KernelState>> FirstLastInit(KernelContext* ctx,
+                                                   const KernelInitArgs& args) {
+  ARROW_ASSIGN_OR_RAISE(TypeHolder out_type,
+                        args.kernel->signature->out_type().Resolve(ctx, args.inputs));
+
+  FirstLastInitState<SimdLevel::NONE> visitor(
+      ctx, *args.inputs[0], out_type.GetSharedPtr(),
+      static_cast<const ScalarAggregateOptions&>(*args.options));
+  return visitor.Create();
+}
+
+// For "first" and "last" functions: override finanlize and return the actual value

Review Comment:
   ```suggestion
   // For "first" and "last" functions: override finalize and return the actual value
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org