You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 18:50:34 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #12460: ARROW-13530: [C++] Implement cumulative sum compute function

lidavidm commented on code in PR #12460:
URL: https://github.com/apache/arrow/pull/12460#discussion_r844268905


##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -188,6 +188,29 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions {
   NullPlacement null_placement;
 };
 
+/// \brief Options for cumulative sum function
+class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions {
+ public:
+  // Default `start` value is of type Int8 to allow type casting and compatibility rules
+  // to use the type of the function values.
+  explicit CumulativeSumOptions(
+      std::shared_ptr<Scalar> start = std::make_shared<NumericScalar<Int64Type>>(0),

Review Comment:
   nit, but `NumericScalar<Int64Type>` should be written `Int64Scalar`



##########
cpp/src/arrow/compute/api_vector.h:
##########
@@ -188,6 +188,29 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions {
   NullPlacement null_placement;
 };
 
+/// \brief Options for cumulative sum function
+class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions {
+ public:
+  // Default `start` value is of type Int8 to allow type casting and compatibility rules
+  // to use the type of the function values.

Review Comment:
   The default value's type doesn't match this comment



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class

Review Comment:
   Looks like this needs addressing?



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>

Review Comment:
   Are all of these includes really necessary?



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/buffer.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"  // IntegralArrowTypes
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+#include "arrow/ipc/json_simple.h"
+
+namespace arrow {
+namespace compute {
+
+using CumulativeTypes =

Review Comment:
   Can we just use NumericArrowTypes instead here?



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class
+template <typename OutType, typename ArgType, typename Op, typename OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = OptionsWrapper<OptionsType>::Get(ctx);
+    auto skip_nulls = options.skip_nulls;
+
+    // Cast `start` option to match output type
+    // NOTE: Is at least one descriptor guaranteed? If not, need to check size
+    // before indexing.
+    auto out_type = batch.GetDescriptors()[0].type;
+    ARROW_ASSIGN_OR_RAISE(auto cast_value, Cast(Datum(options.start), out_type));
+    auto start = UnboxScalar<OutType>::Unbox(*cast_value.scalar());
+
+    int64_t base_output_offset = 0;
+    bool encounted_null = false;
+    ArrayData* out_arr = out->mutable_array();
+
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto st = Call(ctx, base_output_offset, *batch[0].array(), out_arr, start,
+                       skip_nulls, encounted_null);
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return st;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(Call(ctx, base_output_offset, *chunk->data(), out_arr, start,
+                             skip_nulls, encounted_null));
+          base_output_offset += chunk->length();
+        }
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+  }
+
+  static Status Call(KernelContext* ctx, int64_t base_output_offset,
+                     const ArrayData& input, ArrayData* output, ArgValue& accumulator,
+                     bool skip_nulls, bool& encounted_null) {
+    Status st = Status::OK();
+    auto out_bitmap = output->GetMutableValues<uint8_t>(0);
+    auto out_data = output->GetMutableValues<OutValue>(1) + base_output_offset;
+    auto start_null_idx = input.offset;

Review Comment:
   This should probably be initialized to `0` not `input.offset`



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class
+template <typename OutType, typename ArgType, typename Op, typename OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = OptionsWrapper<OptionsType>::Get(ctx);
+    auto skip_nulls = options.skip_nulls;
+
+    // Cast `start` option to match output type
+    // NOTE: Is at least one descriptor guaranteed? If not, need to check size
+    // before indexing.
+    auto out_type = batch.GetDescriptors()[0].type;
+    ARROW_ASSIGN_OR_RAISE(auto cast_value, Cast(Datum(options.start), out_type));
+    auto start = UnboxScalar<OutType>::Unbox(*cast_value.scalar());
+
+    int64_t base_output_offset = 0;
+    bool encounted_null = false;
+    ArrayData* out_arr = out->mutable_array();
+
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto st = Call(ctx, base_output_offset, *batch[0].array(), out_arr, start,
+                       skip_nulls, encounted_null);
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return st;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(Call(ctx, base_output_offset, *chunk->data(), out_arr, start,
+                             skip_nulls, encounted_null));
+          base_output_offset += chunk->length();
+        }
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+  }
+
+  static Status Call(KernelContext* ctx, int64_t base_output_offset,
+                     const ArrayData& input, ArrayData* output, ArgValue& accumulator,
+                     bool skip_nulls, bool& encounted_null) {
+    Status st = Status::OK();
+    auto out_bitmap = output->GetMutableValues<uint8_t>(0);
+    auto out_data = output->GetMutableValues<OutValue>(1) + base_output_offset;
+    auto start_null_idx = input.offset;
+    int64_t curr = 0;
+
+    VisitArrayValuesInline<ArgType>(
+        input,
+        [&](ArgValue v) {
+          if (!skip_nulls && encounted_null) {
+            *out_data++ = OutValue{};
+          } else {
+            accumulator =
+                Op::template Call<OutValue, ArgValue, ArgValue>(ctx, v, accumulator, &st);
+            *out_data++ = accumulator;
+            ++start_null_idx;
+          }
+          ++curr;
+        },
+        [&]() {
+          // null
+          *out_data++ = OutValue{};
+          encounted_null = true;
+          arrow::bit_util::SetBitsTo(out_bitmap, base_output_offset + curr, 1, false);
+          ++curr;
+        });
+
+    if (!skip_nulls) {
+      auto null_length = input.length - (start_null_idx - input.offset);

Review Comment:
   Note that `input.length` is independent of `input.offset` so there's no need to make `start_null_index` relative to `input.offset`



##########
cpp/src/arrow/compute/api_vector.cc:
##########
@@ -176,6 +180,22 @@ SelectKOptions::SelectKOptions(int64_t k, std::vector<SortKey> sort_keys)
       sort_keys(std::move(sort_keys)) {}
 constexpr char SelectKOptions::kTypeName[];
 
+// CumulativeGenericOptions::CumulativeGenericOptions(uint64_t start, bool skip_nulls)
+//     : CumulativeGenericOptions(std::make_shared<UInt64Scalar>(start), skip_nulls) {}
+//
+// CumulativeGenericOptions::CumulativeGenericOptions(int64_t start, bool skip_nulls)
+//     : CumulativeGenericOptions(std::make_shared<Int64Scalar>(start), skip_nulls) {}
+//
+// CumulativeGenericOptions::CumulativeGenericOptions(double start, bool skip_nulls)
+//     : CumulativeGenericOptions(std::make_shared<DoubleScalar>(start), skip_nulls) {}
+
+CumulativeGenericOptions::CumulativeGenericOptions(std::shared_ptr<Scalar> start,
+                                                   bool skip_nulls)
+    : FunctionOptions(internal::kCumulativeGenericOptionsType),
+      start(std::move(start)),

Review Comment:
   We should move here because the signature is `shared_ptr<Scalar>` not `const shared_ptr<Scalar>&` so we already have a copy



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/buffer.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"  // IntegralArrowTypes
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+#include "arrow/ipc/json_simple.h"
+
+namespace arrow {
+namespace compute {
+
+using CumulativeTypes =

Review Comment:
   Though, this isn't actually used!



##########
cpp/src/arrow/compute/kernels/base_arithmetic_internal.h:
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/compute/kernels/util_internal.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/int_util_internal.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+using internal::AddWithOverflow;
+
+namespace compute {
+namespace internal {
+
+struct Add {
+  template <typename T, typename Arg0, typename Arg1>
+  static constexpr enable_if_floating_value<T> Call(KernelContext*, Arg0 left, Arg1 right,
+                                                    Status*) {
+    return left + right;
+  }
+
+  template <typename T, typename Arg0, typename Arg1>
+  static constexpr enable_if_unsigned_integer_value<T> Call(KernelContext*, Arg0 left,
+                                                            Arg1 right, Status*) {
+    return left + right;
+  }
+
+  template <typename T, typename Arg0, typename Arg1>
+  static constexpr enable_if_signed_integer_value<T> Call(KernelContext*, Arg0 left,
+                                                          Arg1 right, Status*) {
+    return arrow::internal::SafeSignedAdd(left, right);
+  }
+
+  template <typename T, typename Arg0, typename Arg1>
+  static enable_if_decimal_value<T> Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+    return left + right;
+  }
+};
+
+struct AddChecked {
+  template <typename T, typename Arg0, typename Arg1>
+  static enable_if_integer_value<T> Call(KernelContext*, Arg0 left, Arg1 right,
+                                         Status* st) {
+    static_assert(std::is_same<T, Arg0>::value && std::is_same<T, Arg1>::value, "");
+    T result = 0;
+    if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) {
+      *st = Status::Invalid("overflow");
+    }
+    return result;
+  }
+
+  template <typename T, typename Arg0, typename Arg1>
+  static enable_if_floating_value<T> Call(KernelContext*, Arg0 left, Arg1 right,
+                                          Status*) {
+    static_assert(std::is_same<T, Arg0>::value && std::is_same<T, Arg1>::value, "");
+    return left + right;
+  }
+
+  template <typename T, typename Arg0, typename Arg1>
+  static enable_if_decimal_value<T> Call(KernelContext*, Arg0 left, Arg1 right, Status*) {
+    return left + right;
+  }
+};
+
+template <int64_t multiple>
+struct AddTimeDuration {
+  template <typename T, typename Arg0, typename Arg1>
+  static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+    T result =
+        arrow::internal::SafeSignedAdd(static_cast<T>(left), static_cast<T>(right));
+    if (result < 0 || multiple <= result) {
+      *st = Status::Invalid(result, " is not within the acceptable range of ", "[0, ",
+                            multiple, ") s");
+    }
+    return result;
+  }
+};
+
+template <int64_t multiple>
+struct AddTimeDurationChecked {
+  template <typename T, typename Arg0, typename Arg1>
+  static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) {
+    T result = 0;
+    if (ARROW_PREDICT_FALSE(
+            AddWithOverflow(static_cast<T>(left), static_cast<T>(right), &result))) {
+      *st = Status::Invalid("overflow");
+    }
+    if (result < 0 || multiple <= result) {

Review Comment:
   Agreed here.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/buffer.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"  // IntegralArrowTypes
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+#include "arrow/ipc/json_simple.h"

Review Comment:
   I think this shouldn't be necessary, ArrayFromJSON et al come from gtest_util.h



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/buffer.h"
+#include "arrow/chunked_array.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/testing/gtest_util.h"  // IntegralArrowTypes
+#include "arrow/testing/util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+#include "arrow/compute/api.h"
+#include "arrow/compute/kernels/test_util.h"
+
+#include "arrow/ipc/json_simple.h"
+
+namespace arrow {
+namespace compute {
+
+using CumulativeTypes =
+    testing::Types<UInt8Type, UInt16Type, UInt32Type, UInt64Type, Int8Type, Int16Type,
+                   Int32Type, Int64Type, FloatType, DoubleType>;
+
+template <typename T, typename OptionsType>
+class TestCumulativeOp : public ::testing::Test {
+ public:
+  using ArrowType = T;
+  using ArrowScalar = typename TypeTraits<T>::ScalarType;

Review Comment:
   nit, but usually this is named ScalarType (and ditto below)



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class
+template <typename OutType, typename ArgType, typename Op, typename OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = OptionsWrapper<OptionsType>::Get(ctx);
+    auto skip_nulls = options.skip_nulls;
+
+    // Cast `start` option to match output type
+    // NOTE: Is at least one descriptor guaranteed? If not, need to check size
+    // before indexing.
+    auto out_type = batch.GetDescriptors()[0].type;
+    ARROW_ASSIGN_OR_RAISE(auto cast_value, Cast(Datum(options.start), out_type));
+    auto start = UnboxScalar<OutType>::Unbox(*cast_value.scalar());
+
+    int64_t base_output_offset = 0;
+    bool encounted_null = false;
+    ArrayData* out_arr = out->mutable_array();
+
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto st = Call(ctx, base_output_offset, *batch[0].array(), out_arr, start,
+                       skip_nulls, encounted_null);
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return st;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(Call(ctx, base_output_offset, *chunk->data(), out_arr, start,
+                             skip_nulls, encounted_null));
+          base_output_offset += chunk->length();
+        }
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+  }
+
+  static Status Call(KernelContext* ctx, int64_t base_output_offset,
+                     const ArrayData& input, ArrayData* output, ArgValue& accumulator,

Review Comment:
   Arrow prefers pointers over non-const references, try `ArgValue* accumulator` and `bool* encountered_null`



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class
+template <typename OutType, typename ArgType, typename Op, typename OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = OptionsWrapper<OptionsType>::Get(ctx);
+    auto skip_nulls = options.skip_nulls;
+
+    // Cast `start` option to match output type
+    // NOTE: Is at least one descriptor guaranteed? If not, need to check size
+    // before indexing.
+    auto out_type = batch.GetDescriptors()[0].type;

Review Comment:
   This is a unary function so you will always get one descriptor. However, note that `out->type()` is already the out type.



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops_test.cc:
##########
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <functional>
+#include <locale>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/array.h"
+#include "arrow/array/builder_decimal.h"
+#include "arrow/buffer.h"

Review Comment:
   These two includes don't seem necessary?



##########
cpp/src/arrow/compute/kernels/vector_cumulative_ops.cc:
##########
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/array_base.h"
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/api_vector.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/kernels/base_arithmetic_internal.h"
+#include "arrow/compute/kernels/codegen_internal.h"
+#include "arrow/compute/kernels/common.h"
+#include "arrow/result.h"
+#include "arrow/util/bit_util.h"
+#include "arrow/visit_type_inline.h"
+
+namespace arrow {
+namespace compute {
+namespace internal {
+
+// NOTE: Missing description of this class
+template <typename OutType, typename ArgType, typename Op, typename OptionsType>
+struct CumulativeGeneric {
+  using OutValue = typename GetOutputType<OutType>::T;
+  using ArgValue = typename GetViewType<ArgType>::T;
+
+  static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    const auto& options = OptionsWrapper<OptionsType>::Get(ctx);
+    auto skip_nulls = options.skip_nulls;
+
+    // Cast `start` option to match output type
+    // NOTE: Is at least one descriptor guaranteed? If not, need to check size
+    // before indexing.
+    auto out_type = batch.GetDescriptors()[0].type;
+    ARROW_ASSIGN_OR_RAISE(auto cast_value, Cast(Datum(options.start), out_type));
+    auto start = UnboxScalar<OutType>::Unbox(*cast_value.scalar());
+
+    int64_t base_output_offset = 0;
+    bool encounted_null = false;
+    ArrayData* out_arr = out->mutable_array();
+
+    switch (batch[0].kind()) {
+      case Datum::ARRAY: {
+        auto st = Call(ctx, base_output_offset, *batch[0].array(), out_arr, start,
+                       skip_nulls, encounted_null);
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return st;
+      }
+      case Datum::CHUNKED_ARRAY: {
+        const auto& input = batch[0].chunked_array();
+
+        for (const auto& chunk : input->chunks()) {
+          RETURN_NOT_OK(Call(ctx, base_output_offset, *chunk->data(), out_arr, start,
+                             skip_nulls, encounted_null));
+          base_output_offset += chunk->length();
+        }
+        out_arr->SetNullCount(arrow::kUnknownNullCount);
+        return Status::OK();
+      }
+      default:
+        return Status::NotImplemented(
+            "Unsupported input type for function 'cumulative_<operator>': ",
+            batch[0].ToString());
+    }
+  }
+
+  static Status Call(KernelContext* ctx, int64_t base_output_offset,
+                     const ArrayData& input, ArrayData* output, ArgValue& accumulator,
+                     bool skip_nulls, bool& encounted_null) {
+    Status st = Status::OK();
+    auto out_bitmap = output->GetMutableValues<uint8_t>(0);
+    auto out_data = output->GetMutableValues<OutValue>(1) + base_output_offset;
+    auto start_null_idx = input.offset;
+    int64_t curr = 0;
+
+    VisitArrayValuesInline<ArgType>(
+        input,
+        [&](ArgValue v) {
+          if (!skip_nulls && encounted_null) {
+            *out_data++ = OutValue{};
+          } else {
+            accumulator =
+                Op::template Call<OutValue, ArgValue, ArgValue>(ctx, v, accumulator, &st);
+            *out_data++ = accumulator;
+            ++start_null_idx;
+          }
+          ++curr;
+        },
+        [&]() {
+          // null
+          *out_data++ = OutValue{};
+          encounted_null = true;
+          arrow::bit_util::SetBitsTo(out_bitmap, base_output_offset + curr, 1, false);

Review Comment:
   INTERSECTION means that the output null bitmap will be initialized to the logical AND of all the input null bitmaps, so you'll probably see that the output bitmap is already true if the input is already not null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org