You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "js8544 (via GitHub)" <gi...@apache.org> on 2023/10/13 08:17:01 UTC

Re: [PR] GH-36831: [C++] DictionaryArray support for MinMax Function [arrow]

js8544 commented on code in PR #37100:
URL: https://github.com/apache/arrow/pull/37100#discussion_r1357849884


##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -492,11 +492,32 @@ Result<std::unique_ptr<KernelState>> MinMaxInit(KernelContext* ctx,
   return visitor.Create();
 }
 
+struct AnyExceptDictionaryMatcher : TypeMatcher {

Review Comment:
   I think there's a more general approach for matching with non-dictionary types. We can add a `NotMatcher` in kernel.cc with constructor 
   
   ```explicit NotMatcher(std::shared_ptr<TypeMatcher> base_matcher)``` 
   
   and returns `!base_matcher.Matches(type)`. And when registering kernels we will use `Not(SameTypeId(Type::DICTIONARY))`. It will be useful if some other kernels face the same issue.



##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -516,7 +537,10 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
 
   // Note SIMD level is always NONE, but the convenience kernel will
   // dispatch to an appropriate implementation
-  AddAggKernel(std::move(sig), std::move(init), std::move(finalize), func);
+  AddAggKernel(sig, init, finalize, func);
+
+  sig = KernelSignature::Make({InputType(Type::DICTIONARY)}, DictionaryValueType);

Review Comment:
   Since we use `min_max_func->DispatchExact` in `init`, do we need to separate dict and non dict types here?



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
   }
 };
 
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+  using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+  DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : options(std::move(options)),
+        out_type(std::move(out_type)),
+        has_nulls(false),
+        count(0),
+        min(nullptr),
+        max(nullptr) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+    if (batch[0].is_scalar()) {
+      return Status::NotImplemented("No min/max implemented for DictionaryScalar");
+    }
+
+    DictionaryArray dict_arr(batch[0].array.ToArrayData());
+    ARROW_ASSIGN_OR_RAISE(auto compacted_arr, dict_arr.Compact(ctx->memory_pool()));
+    const DictionaryArray& compacted_dict_arr =
+        checked_cast<const DictionaryArray&>(*compacted_arr);
+    this->has_nulls = compacted_dict_arr.null_count() > 0;
+    this->count += compacted_dict_arr.length() - compacted_dict_arr.null_count();
+
+    const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+    if (dict->length() == 0) {

Review Comment:
   Should this check happen before updating `has_nulls` and `count`?



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
   }
 };
 
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+  using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+  DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : options(std::move(options)),
+        out_type(std::move(out_type)),
+        has_nulls(false),
+        count(0),
+        min(nullptr),
+        max(nullptr) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+    if (batch[0].is_scalar()) {
+      return Status::NotImplemented("No min/max implemented for DictionaryScalar");
+    }
+
+    DictionaryArray dict_arr(batch[0].array.ToArrayData());
+    ARROW_ASSIGN_OR_RAISE(auto compacted_arr, dict_arr.Compact(ctx->memory_pool()));
+    const DictionaryArray& compacted_dict_arr =
+        checked_cast<const DictionaryArray&>(*compacted_arr);
+    this->has_nulls = compacted_dict_arr.null_count() > 0;
+    this->count += compacted_dict_arr.length() - compacted_dict_arr.null_count();
+
+    const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+    if (dict->length() == 0) {
+      return Status::OK();
+    }
+
+    Datum dict_values(std::move(dict));
+    ARROW_ASSIGN_OR_RAISE(
+        Datum result, MinMax(std::move(dict_values), ScalarAggregateOptions::Defaults(),
+                             ctx->exec_context()));
+    const StructScalar& struct_result =
+        checked_cast<const StructScalar&>(*result.scalar());
+    ARROW_ASSIGN_OR_RAISE(auto dict_min, struct_result.field(FieldRef("min")));
+    ARROW_ASSIGN_OR_RAISE(auto dict_max, struct_result.field(FieldRef("max")));
+    ARROW_RETURN_NOT_OK(CompareMinMax(std::move(dict_min), std::move(dict_max), ctx));

Review Comment:
   nit: I would call this function UpdateMinMaxState to reflect that it changes the state. CompareMinMax sounds like a pure function.



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
   }
 };
 
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+  using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+  DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : options(std::move(options)),
+        out_type(std::move(out_type)),
+        has_nulls(false),
+        count(0),
+        min(nullptr),
+        max(nullptr) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+    if (batch[0].is_scalar()) {
+      return Status::NotImplemented("No min/max implemented for DictionaryScalar");
+    }
+
+    DictionaryArray dict_arr(batch[0].array.ToArrayData());
+    ARROW_ASSIGN_OR_RAISE(auto compacted_arr, dict_arr.Compact(ctx->memory_pool()));
+    const DictionaryArray& compacted_dict_arr =
+        checked_cast<const DictionaryArray&>(*compacted_arr);
+    this->has_nulls = compacted_dict_arr.null_count() > 0;
+    this->count += compacted_dict_arr.length() - compacted_dict_arr.null_count();
+
+    const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+    if (dict->length() == 0) {
+      return Status::OK();
+    }
+
+    Datum dict_values(std::move(dict));

Review Comment:
   Moving a const reference will have no effect



##########
cpp/src/arrow/compute/kernels/aggregate_test.cc:
##########
@@ -2047,6 +2047,117 @@ TEST(TestDecimalMinMaxKernel, Decimals) {
   }
 }
 
+TEST(TestDictionaryMinMaxKernel, DictionaryArray) {
+  ScalarAggregateOptions options;
+  std::shared_ptr<arrow::DataType> dict_ty;
+  std::shared_ptr<arrow::DataType> ty;
+  for (const auto& index_type : all_dictionary_index_types()) {
+    ARROW_SCOPED_TRACE("index_type = ", index_type->ToString());
+
+    for (const auto& item_ty_ : {decimal128(5, 2), decimal256(5, 2)}) {

Review Comment:
   We need test cases for integer, float and maybe temporal types too. Many of the test cases here can be reused though.



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
   }
 };
 
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+  using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+  DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : options(std::move(options)),
+        out_type(std::move(out_type)),
+        has_nulls(false),
+        count(0),
+        min(nullptr),
+        max(nullptr) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+    if (batch[0].is_scalar()) {
+      return Status::NotImplemented("No min/max implemented for DictionaryScalar");
+    }
+
+    DictionaryArray dict_arr(batch[0].array.ToArrayData());
+    ARROW_ASSIGN_OR_RAISE(auto compacted_arr, dict_arr.Compact(ctx->memory_pool()));
+    const DictionaryArray& compacted_dict_arr =
+        checked_cast<const DictionaryArray&>(*compacted_arr);
+    this->has_nulls = compacted_dict_arr.null_count() > 0;

Review Comment:
   ```suggestion
       this->has_nulls |= compacted_dict_arr.null_count() > 0;
   ```



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -1002,6 +1115,11 @@ struct MinMaxInitState {
     return Status::OK();
   }
 
+  Status Visit(const DictionaryType&) {
+    state.reset(new DictionaryMinMaxImpl<SimdLevel>(out_type, options));

Review Comment:
   In DictionaryMinMaxImpl there are many function calls on `input.value_type` such as 'MinMax' in `Consume` and 'greater'&'less' in `CompareMinMax`. Each function call goes through a dispatching process and some other function call overheads. Since the value type is fixed, we can get its corresponding kernels using `Function::DispatchExact` here and directly invoke the kernels in DictionaryMinMaxImpl.
   
   This is also helpful when user inputs an unsupported type such as `dict(list(int32))`, we can fail early and return a better error message if we try dispatching here. Currently it will first compact the dictionary before failing and the error message is something like `MinMax has no kernel matching list(int32)` instead of showing the real input type `dict(list(int32))`.
   
   But this can be left as an optimization for the future. You can create an issue and leave this PR as it is.



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -912,6 +912,119 @@ struct NullMinMaxImpl : public ScalarAggregator {
   }
 };
 
+template <SimdLevel::type SimdLevel>
+struct DictionaryMinMaxImpl : public ScalarAggregator {
+  using ThisType = DictionaryMinMaxImpl<SimdLevel>;
+
+  DictionaryMinMaxImpl(std::shared_ptr<DataType> out_type, ScalarAggregateOptions options)
+      : options(std::move(options)),
+        out_type(std::move(out_type)),
+        has_nulls(false),
+        count(0),
+        min(nullptr),
+        max(nullptr) {
+    this->options.min_count = std::max<uint32_t>(1, this->options.min_count);
+  }
+
+  Status Consume(KernelContext* ctx, const ExecSpan& batch) override {
+    if (batch[0].is_scalar()) {
+      return Status::NotImplemented("No min/max implemented for DictionaryScalar");
+    }
+
+    DictionaryArray dict_arr(batch[0].array.ToArrayData());
+    ARROW_ASSIGN_OR_RAISE(auto compacted_arr, dict_arr.Compact(ctx->memory_pool()));
+    const DictionaryArray& compacted_dict_arr =
+        checked_cast<const DictionaryArray&>(*compacted_arr);
+    this->has_nulls = compacted_dict_arr.null_count() > 0;
+    this->count += compacted_dict_arr.length() - compacted_dict_arr.null_count();
+
+    const std::shared_ptr<Array>& dict = compacted_dict_arr.dictionary();
+    if (dict->length() == 0) {
+      return Status::OK();
+    }
+
+    Datum dict_values(std::move(dict));
+    ARROW_ASSIGN_OR_RAISE(
+        Datum result, MinMax(std::move(dict_values), ScalarAggregateOptions::Defaults(),
+                             ctx->exec_context()));
+    const StructScalar& struct_result =
+        checked_cast<const StructScalar&>(*result.scalar());
+    ARROW_ASSIGN_OR_RAISE(auto dict_min, struct_result.field(FieldRef("min")));
+    ARROW_ASSIGN_OR_RAISE(auto dict_max, struct_result.field(FieldRef("max")));
+    ARROW_RETURN_NOT_OK(CompareMinMax(std::move(dict_min), std::move(dict_max), ctx));
+    return Status::OK();
+  }
+
+  Status MergeFrom(KernelContext* ctx, KernelState&& src) override {
+    const auto& other = checked_cast<const ThisType&>(src);
+
+    ARROW_RETURN_NOT_OK(CompareMinMax(other.min, other.max, ctx));
+    this->has_nulls = this->has_nulls || other.has_nulls;
+    this->count += other.count;
+    return Status::OK();
+  }
+
+  Status Finalize(KernelContext*, Datum* out) override {
+    const auto& struct_type = checked_cast<const StructType&>(*out_type);
+    const auto& child_type = struct_type.field(0)->type();
+
+    std::vector<std::shared_ptr<Scalar>> values;
+    // Physical type != result type

Review Comment:
   Do we need this check here? I think the primitive kernels guarantee that this->min/max is of the same type as child_type.



##########
cpp/src/arrow/compute/kernels/aggregate_basic.cc:
##########
@@ -516,7 +537,10 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func,
 
   // Note SIMD level is always NONE, but the convenience kernel will
   // dispatch to an appropriate implementation
-  AddAggKernel(std::move(sig), std::move(init), std::move(finalize), func);
+  AddAggKernel(sig, init, finalize, func);
+
+  sig = KernelSignature::Make({InputType(Type::DICTIONARY)}, DictionaryValueType);

Review Comment:
   And therefore we probably don't need the `AnyExceptDictionaryMatcher` at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org