You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/26 13:36:29 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10390: ARROW-12751: [C++] Implement minimum/maximum kernels

pitrou commented on a change in pull request #10390:
URL: https://github.com/apache/arrow/pull/10390#discussion_r639703591



##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +565,190 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    const size_t scalar_count =
+        static_cast<size_t>(std::count_if(batch.values.begin(), batch.values.end(),
+                                          [](const Datum& d) { return d.is_scalar(); }));
+    if (scalar_count == batch.values.size()) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // At least one array, two or more arguments
+    ArrayDataVector arrays;
+    for (const auto& arg : batch.values) {
+      if (!arg.is_array()) continue;
+      arrays.push_back(arg.array());
+    }
+
+    if (scalar_count > 0) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        arrays.push_back(array->data());
+      } else if (!options.skip_nulls) {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    }
+
+    // Exactly one array to consider (output = input)
+    if (arrays.size() == 1) {
+      *output = *arrays[0];
+      return Status::OK();
+    }
+
+    // Two or more arrays to consider
+    if (scalar_count > 0) {
+      // We allocated the last array from a scalar: recycle it as the output

Review comment:
       Even if the scalar was null and `options.skip_nulls` is true?

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +565,190 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    const size_t scalar_count =
+        static_cast<size_t>(std::count_if(batch.values.begin(), batch.values.end(),
+                                          [](const Datum& d) { return d.is_scalar(); }));
+    if (scalar_count == batch.values.size()) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // At least one array, two or more arguments
+    ArrayDataVector arrays;
+    for (const auto& arg : batch.values) {
+      if (!arg.is_array()) continue;
+      arrays.push_back(arg.array());
+    }
+
+    if (scalar_count > 0) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        arrays.push_back(array->data());
+      } else if (!options.skip_nulls) {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    }
+
+    // Exactly one array to consider (output = input)
+    if (arrays.size() == 1) {
+      *output = *arrays[0];
+      return Status::OK();
+    }
+
+    // Two or more arrays to consider
+    if (scalar_count > 0) {
+      // We allocated the last array from a scalar: recycle it as the output
+      *output = *arrays.back();
+    } else {
+      // Copy last array argument to output array
+      const ArrayData& input = *arrays.back();
+      if (options.skip_nulls && input.buffers[0]) {
+        // Don't copy the bitmap if !options.skip_nulls since we'll precompute it later
+        ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+        ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset,
+                                      batch.length, output->buffers[0]->mutable_data(),
+                                      /*dest_offset=*/0);
+      }
+      // This won't work for nested or variable-sized types
+      ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                            ctx->Allocate(batch.length * sizeof(OutValue)));
+      std::memcpy(output->buffers[1]->mutable_data(),
+                  input.buffers[1]->data() + (input.offset * sizeof(OutValue)),
+                  batch.length * sizeof(OutValue));
+    }
+
+    if (!options.skip_nulls) {
+      // We can precompute the validity buffer in this case
+      // AND together the validity buffers of all arrays
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));

Review comment:
       This will allocate a null bitmap even if all input arrays have 0 nulls.

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
##########
@@ -312,6 +313,89 @@ class TestBinaryArithmeticUnsigned : public TestBinaryArithmeticIntegral<T> {};
 template <typename T>
 class TestBinaryArithmeticFloating : public TestBinaryArithmetic<T> {};
 
+template <typename T>
+class TestVarArgsArithmetic : public TestBase {
+ protected:
+  static std::shared_ptr<DataType> type_singleton() {
+    return TypeTraits<T>::type_singleton();
+  }
+
+  using VarArgsFunction = std::function<Result<Datum>(
+      const std::vector<Datum>&, ElementWiseAggregateOptions, ExecContext*)>;
+
+  Datum scalar(const std::string& value) {
+    return ScalarFromJSON(type_singleton(), value);
+  }
+
+  Datum array(const std::string& value) { return ArrayFromJSON(type_singleton(), value); }
+
+  Datum Eval(VarArgsFunction func, const std::vector<Datum>& args) {
+    EXPECT_OK_AND_ASSIGN(auto actual,
+                         func(args, element_wise_aggregate_options_, nullptr));
+    if (actual.is_array()) {
+      auto arr = actual.make_array();
+      ARROW_EXPECT_OK(arr->ValidateFull());
+    }
+    return actual;
+  }
+
+  void AssertNullScalar(VarArgsFunction func, const std::vector<Datum>& args) {
+    auto datum = this->Eval(func, args);
+    ASSERT_TRUE(datum.is_scalar());
+    ASSERT_FALSE(datum.scalar()->is_valid);
+  }
+
+  void Assert(VarArgsFunction func, Datum expected, const std::vector<Datum>& args) {
+    std::stringstream ss;
+    ss << "Inputs:";
+    for (const auto& arg : args) {
+      ss << ' ';
+      if (arg.is_scalar())
+        ss << arg.scalar()->ToString();
+      else if (arg.is_array())
+        ss << arg.make_array()->ToString();

Review comment:
       Uh. I don't think generating string representations for each and every test is a good idea.

##########
File path: docs/source/cpp/compute.rst
##########
@@ -310,6 +310,20 @@ output element is null.
 | less, less_equal         |            |                                             |                     |
 +--------------------------+------------+---------------------------------------------+---------------------+
 
+These functions take any number of inputs of numeric type (in which case they
+will be cast to the :ref:`common numeric type <common-numeric-type>` before
+comparison) or of temporal types. If any input is dictionary encoded it will be
+expanded for the purposes of comparison.
+
++--------------------------+------------+---------------------------------------------+---------------------+-------+
+| Function names           | Arity      | Input types                                 | Output type         | Notes |

Review comment:
       Should add a column for the options class.

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +565,190 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    const size_t scalar_count =
+        static_cast<size_t>(std::count_if(batch.values.begin(), batch.values.end(),
+                                          [](const Datum& d) { return d.is_scalar(); }));
+    if (scalar_count == batch.values.size()) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // At least one array, two or more arguments
+    ArrayDataVector arrays;
+    for (const auto& arg : batch.values) {
+      if (!arg.is_array()) continue;
+      arrays.push_back(arg.array());
+    }
+
+    if (scalar_count > 0) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        arrays.push_back(array->data());
+      } else if (!options.skip_nulls) {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    }
+
+    // Exactly one array to consider (output = input)
+    if (arrays.size() == 1) {
+      *output = *arrays[0];
+      return Status::OK();
+    }
+
+    // Two or more arrays to consider
+    if (scalar_count > 0) {
+      // We allocated the last array from a scalar: recycle it as the output
+      *output = *arrays.back();
+    } else {
+      // Copy last array argument to output array
+      const ArrayData& input = *arrays.back();
+      if (options.skip_nulls && input.buffers[0]) {
+        // Don't copy the bitmap if !options.skip_nulls since we'll precompute it later
+        ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+        ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset,
+                                      batch.length, output->buffers[0]->mutable_data(),
+                                      /*dest_offset=*/0);
+      }
+      // This won't work for nested or variable-sized types
+      ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                            ctx->Allocate(batch.length * sizeof(OutValue)));
+      std::memcpy(output->buffers[1]->mutable_data(),
+                  input.buffers[1]->data() + (input.offset * sizeof(OutValue)),
+                  batch.length * sizeof(OutValue));
+    }
+
+    if (!options.skip_nulls) {
+      // We can precompute the validity buffer in this case
+      // AND together the validity buffers of all arrays
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+      bool first = true;
+      for (const auto& arr : arrays) {
+        if (!arr->buffers[0]) continue;
+        if (first) {

Review comment:
       Instead of `if (first)`, you could move the bitmap allocation below and use `if (!output->buffers[0])`.

##########
File path: cpp/src/arrow/ipc/json_simple.cc
##########
@@ -911,6 +912,26 @@ Status DictArrayFromJSON(const std::shared_ptr<DataType>& type,
       .Value(out);
 }
 
+Status ScalarFromJSON(const std::shared_ptr<DataType>& type,
+                      util::string_view json_string, std::shared_ptr<Scalar>* out) {
+  std::shared_ptr<Converter> converter;
+  RETURN_NOT_OK(GetConverter(type, &converter));
+
+  rj::Document json_doc;
+  json_doc.Parse<kParseFlags>(json_string.data(), json_string.length());
+  if (json_doc.HasParseError()) {
+    return Status::Invalid("JSON parse error at offset ", json_doc.GetErrorOffset(), ": ",
+                           GetParseError_En(json_doc.GetParseError()));
+  }
+
+  std::shared_ptr<Array> array;
+  RETURN_NOT_OK(converter->AppendValue(json_doc));
+  RETURN_NOT_OK(converter->Finish(&array));
+  DCHECK_GE(array->length(), 0);

Review comment:
       Shouldn't we ensure it's equal to 1?

##########
File path: cpp/src/arrow/compute/kernels/codegen_internal.h
##########
@@ -1093,6 +1097,41 @@ ArrayKernelExec GeneratePhysicalInteger(detail::GetTypeId get_id) {
   }
 }
 
+template <template <typename... Args> class Generator, typename... Args>
+ArrayKernelExec GeneratePhysicalNumeric(detail::GetTypeId get_id) {
+  switch (get_id.id) {
+    case Type::INT8:
+      return Generator<Int8Type, Args...>::Exec;
+    case Type::INT16:
+      return Generator<Int16Type, Args...>::Exec;
+    case Type::INT32:
+    case Type::DATE32:
+    case Type::TIME32:
+      return Generator<Int32Type, Args...>::Exec;
+    case Type::INT64:
+    case Type::DATE64:
+    case Type::TIMESTAMP:
+    case Type::TIME64:
+    case Type::DURATION:
+      return Generator<Int64Type, Args...>::Exec;
+    case Type::UINT8:
+      return Generator<UInt8Type, Args...>::Exec;
+    case Type::UINT16:
+      return Generator<UInt16Type, Args...>::Exec;
+    case Type::UINT32:
+      return Generator<UInt32Type, Args...>::Exec;
+    case Type::UINT64:
+      return Generator<UInt64Type, Args...>::Exec;
+    case Type::FLOAT:
+      return Generator<FloatType, Args...>::Exec;
+    case Type::DOUBLE:
+      return Generator<DoubleType, Args...>::Exec;

Review comment:
       Perhaps, but the discrepancy seems a bit gratuitous.

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +565,190 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    const size_t scalar_count =
+        static_cast<size_t>(std::count_if(batch.values.begin(), batch.values.end(),
+                                          [](const Datum& d) { return d.is_scalar(); }));
+    if (scalar_count == batch.values.size()) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // At least one array, two or more arguments
+    ArrayDataVector arrays;
+    for (const auto& arg : batch.values) {
+      if (!arg.is_array()) continue;
+      arrays.push_back(arg.array());
+    }
+
+    if (scalar_count > 0) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        arrays.push_back(array->data());
+      } else if (!options.skip_nulls) {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    }
+
+    // Exactly one array to consider (output = input)
+    if (arrays.size() == 1) {
+      *output = *arrays[0];
+      return Status::OK();
+    }
+
+    // Two or more arrays to consider
+    if (scalar_count > 0) {
+      // We allocated the last array from a scalar: recycle it as the output
+      *output = *arrays.back();
+    } else {
+      // Copy last array argument to output array
+      const ArrayData& input = *arrays.back();
+      if (options.skip_nulls && input.buffers[0]) {
+        // Don't copy the bitmap if !options.skip_nulls since we'll precompute it later
+        ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+        ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset,
+                                      batch.length, output->buffers[0]->mutable_data(),
+                                      /*dest_offset=*/0);
+      }
+      // This won't work for nested or variable-sized types
+      ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                            ctx->Allocate(batch.length * sizeof(OutValue)));
+      std::memcpy(output->buffers[1]->mutable_data(),
+                  input.buffers[1]->data() + (input.offset * sizeof(OutValue)),
+                  batch.length * sizeof(OutValue));
+    }
+
+    if (!options.skip_nulls) {
+      // We can precompute the validity buffer in this case
+      // AND together the validity buffers of all arrays
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+      bool first = true;
+      for (const auto& arr : arrays) {
+        if (!arr->buffers[0]) continue;

Review comment:
       Use `arr->null_count() == 0`?

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
##########
@@ -1161,5 +1255,242 @@ TYPED_TEST(TestUnaryArithmeticFloating, AbsoluteValue) {
   }
 }
 
+TYPED_TEST(TestVarArgsArithmeticNumeric, Minimum) {
+  this->AssertNullScalar(Minimum, {});
+  this->AssertNullScalar(Minimum, {this->scalar("null"), this->scalar("null")});
+
+  this->Assert(Minimum, this->scalar("0"), {this->scalar("0")});
+  this->Assert(Minimum, this->scalar("0"),
+               {this->scalar("2"), this->scalar("0"), this->scalar("1")});
+  this->Assert(
+      Minimum, this->scalar("0"),
+      {this->scalar("2"), this->scalar("0"), this->scalar("1"), this->scalar("null")});
+  this->Assert(Minimum, this->scalar("1"),
+               {this->scalar("null"), this->scalar("null"), this->scalar("1"),
+                this->scalar("null")});
+
+  this->Assert(Minimum, (this->array("[]")), {this->array("[]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, null]"), {this->array("[1, 2, 3, null]")});
+
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2"), this->scalar("4")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("null"), this->scalar("2")});
+
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, 2, 2, 2]")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, null, 2, 2]")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->array("[2, 2, 2, 2]")});
+
+  this->Assert(Minimum, this->array("[1, 2, null, 6]"),
+               {this->array("[1, 2, null, null]"), this->array("[4, null, null, 6]")});
+  this->Assert(Minimum, this->array("[1, 2, null, 6]"),
+               {this->array("[4, null, null, 6]"), this->array("[1, 2, null, null]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, 4]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, 4]"),
+               {this->array("[null, null, null, null]"), this->array("[1, 2, 3, 4]")});
+
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("1"), this->array("[1, 2, 3, 4]")});
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("1"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("null"), this->array("[1, 1, 1, 1]")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("null"), this->array("[null, null, null, null]")});
+
+  // Test null handling
+  this->element_wise_aggregate_options_.skip_nulls = false;
+  this->AssertNullScalar(Minimum, {this->scalar("null"), this->scalar("null")});
+  this->AssertNullScalar(Minimum, {this->scalar("0"), this->scalar("null")});
+
+  this->Assert(Minimum, this->array("[1, null, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2"), this->scalar("4")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("null"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, null, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, null, 2, 2]")});
+
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("1"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("null"), this->array("[1, 1, 1, 1]")});
+}
+
+TYPED_TEST(TestVarArgsArithmeticFloating, Minimum) {
+  this->SetNansEqual();

Review comment:
       Why isn't this done by default in the test setup?

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic.cc
##########
@@ -516,6 +565,190 @@ std::shared_ptr<ScalarFunction> MakeUnarySignedArithmeticFunctionNotNull(
   return func;
 }
 
+using MinMaxState = OptionsWrapper<ElementWiseAggregateOptions>;
+
+// Implement a variadic scalar min/max kernel.
+template <typename OutType, typename Op>
+struct ScalarMinMax {
+  using OutValue = typename GetOutputType<OutType>::T;
+
+  static void ExecScalar(const ExecBatch& batch,
+                         const ElementWiseAggregateOptions& options, Scalar* out) {
+    // All arguments are scalar
+    OutValue value{};
+    bool valid = false;
+    for (const auto& arg : batch.values) {
+      // Ignore non-scalar arguments so we can use it in the mixed-scalar-and-array case
+      if (!arg.is_scalar()) continue;
+      const auto& scalar = *arg.scalar();
+      if (!scalar.is_valid) {
+        if (options.skip_nulls) continue;
+        out->is_valid = false;
+        return;
+      }
+      if (!valid) {
+        value = UnboxScalar<OutType>::Unbox(scalar);
+        valid = true;
+      } else {
+        value = Op::Call(value, UnboxScalar<OutType>::Unbox(scalar));
+      }
+    }
+    out->is_valid = valid;
+    if (valid) {
+      BoxScalar<OutType>::Box(value, out);
+    }
+  }
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const ElementWiseAggregateOptions& options = MinMaxState::Get(ctx);
+    const auto descrs = batch.GetDescriptors();
+    const size_t scalar_count =
+        static_cast<size_t>(std::count_if(batch.values.begin(), batch.values.end(),
+                                          [](const Datum& d) { return d.is_scalar(); }));
+    if (scalar_count == batch.values.size()) {
+      ExecScalar(batch, options, out->scalar().get());
+      return Status::OK();
+    }
+
+    ArrayData* output = out->mutable_array();
+
+    // At least one array, two or more arguments
+    ArrayDataVector arrays;
+    for (const auto& arg : batch.values) {
+      if (!arg.is_array()) continue;
+      arrays.push_back(arg.array());
+    }
+
+    if (scalar_count > 0) {
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
+                            MakeScalar(out->type(), 0));
+      ExecScalar(batch, options, temp_scalar.get());
+      if (temp_scalar->is_valid) {
+        // Promote to output array
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        arrays.push_back(array->data());
+      } else if (!options.skip_nulls) {
+        // Abort early
+        ARROW_ASSIGN_OR_RAISE(auto array, MakeArrayFromScalar(*temp_scalar, batch.length,
+                                                              ctx->memory_pool()));
+        *output = *array->data();
+        return Status::OK();
+      }
+    }
+
+    // Exactly one array to consider (output = input)
+    if (arrays.size() == 1) {
+      *output = *arrays[0];
+      return Status::OK();
+    }
+
+    // Two or more arrays to consider
+    if (scalar_count > 0) {
+      // We allocated the last array from a scalar: recycle it as the output
+      *output = *arrays.back();
+    } else {
+      // Copy last array argument to output array
+      const ArrayData& input = *arrays.back();
+      if (options.skip_nulls && input.buffers[0]) {
+        // Don't copy the bitmap if !options.skip_nulls since we'll precompute it later
+        ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+        ::arrow::internal::CopyBitmap(input.buffers[0]->data(), input.offset,
+                                      batch.length, output->buffers[0]->mutable_data(),
+                                      /*dest_offset=*/0);
+      }
+      // This won't work for nested or variable-sized types
+      ARROW_ASSIGN_OR_RAISE(output->buffers[1],
+                            ctx->Allocate(batch.length * sizeof(OutValue)));
+      std::memcpy(output->buffers[1]->mutable_data(),
+                  input.buffers[1]->data() + (input.offset * sizeof(OutValue)),
+                  batch.length * sizeof(OutValue));
+    }
+
+    if (!options.skip_nulls) {
+      // We can precompute the validity buffer in this case
+      // AND together the validity buffers of all arrays
+      ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(batch.length));
+      bool first = true;
+      for (const auto& arr : arrays) {
+        if (!arr->buffers[0]) continue;
+        if (first) {
+          ::arrow::internal::CopyBitmap(arr->buffers[0]->data(), arr->offset,
+                                        batch.length, output->buffers[0]->mutable_data(),
+                                        /*dest_offset=*/0);
+          first = false;
+        } else {
+          ::arrow::internal::BitmapAnd(output->buffers[0]->data(), /*left_offset=*/0,
+                                       arr->buffers[0]->data(), arr->offset, batch.length,
+                                       /*out_offset=*/0,
+                                       output->buffers[0]->mutable_data());
+        }
+      }
+    }
+    arrays.pop_back();
+
+    for (const auto& array : arrays) {
+      OutputArrayWriter<OutType> writer(out->mutable_array());
+      ArrayIterator<OutType> out_it(*output);
+      int64_t index = 0;
+      VisitArrayValuesInline<OutType>(
+          *array,
+          [&](OutValue value) {
+            auto u = out_it();
+            if (!output->buffers[0] ||
+                BitUtil::GetBit(output->buffers[0]->data(), index)) {
+              writer.Write(Op::Call(u, value));
+            } else {
+              writer.Write(value);
+            }
+            index++;
+          },
+          [&]() {
+            // RHS is null, preserve the LHS
+            writer.values++;
+            index++;
+            out_it();
+          });
+      // When skipping nulls, we incrementally compute the validity buffer
+      if (options.skip_nulls && output->buffers[0]) {
+        if (array->buffers[0]) {
+          ::arrow::internal::BitmapOr(
+              output->buffers[0]->data(), /*left_offset=*/0, array->buffers[0]->data(),
+              /*right_offset=*/array->offset, batch.length, /*out_offset=*/0,
+              output->buffers[0]->mutable_data());
+        } else {
+          output->buffers[0] = nullptr;
+        }
+      }
+    }
+    output->null_count = -1;

Review comment:
       Even if there is no validity bitmap?

##########
File path: cpp/src/arrow/compute/kernels/scalar_arithmetic_test.cc
##########
@@ -1161,5 +1255,242 @@ TYPED_TEST(TestUnaryArithmeticFloating, AbsoluteValue) {
   }
 }
 
+TYPED_TEST(TestVarArgsArithmeticNumeric, Minimum) {
+  this->AssertNullScalar(Minimum, {});
+  this->AssertNullScalar(Minimum, {this->scalar("null"), this->scalar("null")});
+
+  this->Assert(Minimum, this->scalar("0"), {this->scalar("0")});
+  this->Assert(Minimum, this->scalar("0"),
+               {this->scalar("2"), this->scalar("0"), this->scalar("1")});
+  this->Assert(
+      Minimum, this->scalar("0"),
+      {this->scalar("2"), this->scalar("0"), this->scalar("1"), this->scalar("null")});
+  this->Assert(Minimum, this->scalar("1"),
+               {this->scalar("null"), this->scalar("null"), this->scalar("1"),
+                this->scalar("null")});
+
+  this->Assert(Minimum, (this->array("[]")), {this->array("[]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, null]"), {this->array("[1, 2, 3, null]")});
+
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2"), this->scalar("4")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("null"), this->scalar("2")});
+
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, 2, 2, 2]")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, null, 2, 2]")});
+  this->Assert(Minimum, this->array("[1, 2, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->array("[2, 2, 2, 2]")});
+
+  this->Assert(Minimum, this->array("[1, 2, null, 6]"),
+               {this->array("[1, 2, null, null]"), this->array("[4, null, null, 6]")});
+  this->Assert(Minimum, this->array("[1, 2, null, 6]"),
+               {this->array("[4, null, null, 6]"), this->array("[1, 2, null, null]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, 4]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[1, 2, 3, 4]"),
+               {this->array("[null, null, null, null]"), this->array("[1, 2, 3, 4]")});
+
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("1"), this->array("[1, 2, 3, 4]")});
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("1"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[1, 1, 1, 1]"),
+               {this->scalar("null"), this->array("[1, 1, 1, 1]")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("null"), this->array("[null, null, null, null]")});
+
+  // Test null handling
+  this->element_wise_aggregate_options_.skip_nulls = false;
+  this->AssertNullScalar(Minimum, {this->scalar("null"), this->scalar("null")});
+  this->AssertNullScalar(Minimum, {this->scalar("0"), this->scalar("null")});
+
+  this->Assert(Minimum, this->array("[1, null, 2, 2]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("2"), this->scalar("4")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->array("[1, null, 3, 4]"), this->scalar("null"), this->scalar("2")});
+  this->Assert(Minimum, this->array("[1, null, 2, 2]"),
+               {this->array("[1, 2, 3, 4]"), this->array("[2, null, 2, 2]")});
+
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("1"), this->array("[null, null, null, null]")});
+  this->Assert(Minimum, this->array("[null, null, null, null]"),
+               {this->scalar("null"), this->array("[1, 1, 1, 1]")});
+}
+
+TYPED_TEST(TestVarArgsArithmeticFloating, Minimum) {
+  this->SetNansEqual();
+  this->Assert(Maximum, this->scalar("0"), {this->scalar("0"), this->scalar("NaN")});
+  this->Assert(Maximum, this->scalar("0"), {this->scalar("NaN"), this->scalar("0")});
+  this->Assert(Maximum, this->scalar("Inf"), {this->scalar("Inf"), this->scalar("NaN")});
+  this->Assert(Maximum, this->scalar("Inf"), {this->scalar("NaN"), this->scalar("Inf")});
+  this->Assert(Maximum, this->scalar("-Inf"),
+               {this->scalar("-Inf"), this->scalar("NaN")});
+  this->Assert(Maximum, this->scalar("-Inf"),
+               {this->scalar("NaN"), this->scalar("-Inf")});
+  this->Assert(Maximum, this->scalar("NaN"), {this->scalar("NaN"), this->scalar("null")});
+  this->Assert(Minimum, this->scalar("0"), {this->scalar("0"), this->scalar("Inf")});
+  this->Assert(Minimum, this->scalar("-Inf"), {this->scalar("0"), this->scalar("-Inf")});

Review comment:
       Can you add some test with `-0.0` and `0.0`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org