You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/13 05:38:49 UTC

[GitHub] [arrow] cyb70289 opened a new pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

cyb70289 opened a new pull request #10009:
URL: https://github.com/apache/arrow/pull/10009


   Arrow mode kernel performance is bad compared with scipy.stats.mode
   (based on numpy.unique). Arrow mode kernel stores value:count pair in
   a map, while numpy.unique sorts the input array then count the adjacent
   same values. Per my test, the map approach only wins when there are
   many duplicated values (length / value_range > 100), looks not very
   useful in practice.
   
   This patch rewrites mode kernel to use the sort and count approach for
   floating points and integers with wide value range. 2x performance
   improvement is observed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612891573



##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
 
 namespace {
 
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
 constexpr char kModeFieldName[] = "mode";
 constexpr char kCountFieldName[] = "count";
 
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array, int64_t& nan_count) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  nan_count = 0;
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             if (std::isnan(value)) {
-                                               ++nan_count;
-                                             } else {
-                                               ++value_counts_map[value];
-                                             }
-                                           }
-                                         });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+                                                  Datum* out) {
+  const auto& mode_type = TypeTraits<InType>::type_singleton();
+  const auto& count_type = int64();
+
+  auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+  mode_data->buffers.resize(2, nullptr);
+  auto count_data = ArrayData::Make(count_type, n, 0);
+  count_data->buffers.resize(2, nullptr);
+
+  CType* mode_buffer = nullptr;
+  int64_t* count_buffer = nullptr;
+
+  if (n > 0) {
+    ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+    ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+    mode_buffer = mode_data->template GetMutableValues<CType>(1);
+    count_buffer = count_data->template GetMutableValues<int64_t>(1);
   }
 
-  return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_map[values[pos + i]];
-                                           }
-                                         });
-  }
+  const auto& out_type =
+      struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+  *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
 
-  return value_counts_map;
+  return std::make_pair(mode_buffer, count_buffer);
 }
 
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
-  const int range = static_cast<int>(max - min);
-  DCHECK(range >= 0 && range < 64 * 1024 * 1024);
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  std::vector<int64_t> value_counts_vector(range + 1);
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_vector[values[pos + i] - min];
-                                           }
-                                         });
-  }
-
-  // Transfer value counts to a map to be consistent with other chunks
-  CounterMap<CType> value_counts_map(range + 1);
-  for (int i = 0; i <= range; ++i) {
-    CType value = static_cast<CType>(i + min);
-    int64_t count = value_counts_vector[i];
-    if (count) {
-      value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+  using CType = typename InType::c_type;
+
+  using ValueCountPair = std::pair<CType, int64_t>;
+  auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+    const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
+    return lhs.second > rhs.second ||
+           (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+  };
+
+  std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+      std::move(gt));
+
+  const ModeOptions& options = ModeState::Get(ctx);
+  while (true) {
+    const ValueCountPair& value_count = gen();
+    DCHECK_NE(value_count.second, 0);
+    if (value_count.second < 0) break;  // EOF reached
+    if (static_cast<int64_t>(min_heap.size()) < options.n) {
+      min_heap.push(value_count);
+    } else if (gt(value_count, min_heap.top())) {
+      min_heap.pop();
+      min_heap.push(value_count);
     }
   }
+  const int64_t n = min_heap.size();
 
-  return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
-  // see https://issues.apache.org/jira/browse/ARROW-9873
-  static constexpr int kMinArraySize = 8192 / sizeof(CType);
-  static constexpr int kMaxValueRange = 16384;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if ((array.length() - array.null_count()) >= kMinArraySize) {
-    CType min = std::numeric_limits<CType>::max();
-    CType max = std::numeric_limits<CType>::min();
-
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             min = std::min(min, value);
-                                             max = std::max(max, value);
-                                           }
-                                         });
-
-    if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
-      return CountValuesByVector(array, min, max);
-    }
-  }
-  return CountValuesByMap(array);
-}
+  CType* mode_buffer;
+  int64_t* count_buffer;
+  KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                         PrepareOutput<InType>(n, ctx, out));
 
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  // we need just count ones and zeros
-  CounterMap<CType> map;
-  if (array.length() > array.null_count()) {
-    map[true] = array.true_count();
-    map[false] = array.length() - array.null_count() - map[true];
+  for (int64_t i = n - 1; i >= 0; --i) {
+    std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+    min_heap.pop();
   }
-  nan_count = 0;
-  return map;
 }
 
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  using Limits = std::numeric_limits<CType>;
-  nan_count = 0;
-  return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+  using CType = typename T::c_type;
 
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMapOrVector(array);
-}
+  CType min;
+  std::vector<int64_t> counts;
 
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>>  // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMap(array, nan_count);
-}
+  CountModer(CType min, CType max) {
+    uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+    DCHECK_LT(value_range, 1 << 20);
+    this->min = min;
+    this->counts.resize(value_range, 0);
+  }
 
-template <typename ArrowType>
-struct ModeState {
-  using ThisType = ModeState<ArrowType>;
-  using CType = typename ArrowType::c_type;
-
-  void MergeFrom(ThisType&& state) {
-    if (this->value_counts.empty()) {
-      this->value_counts = std::move(state.value_counts);
-    } else {
-      for (const auto& value_count : state.value_counts) {
-        auto value = value_count.first;
-        auto count = value_count.second;
-        this->value_counts[value] += count;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // count values in all chunks, ignore nulls
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                ++this->counts[values[pos + i] - this->min];
+                              }
+                            });
       }
     }
-    if (is_floating_type<ArrowType>::value) {
-      this->nan_count += state.nan_count;
-    }
-  }
-
-  // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
-  void Finalize(CType* modes, int64_t* counts, const int64_t n) {
-    DCHECK(n >= 1 && n <= this->DistinctValues());
 
-    // mode 'greater than' comparator: larger count or same count with smaller value
-    using ValueCountPair = std::pair<CType, int64_t>;
-    auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
-      const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
-      return lhs.second > rhs.second ||
-             (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+    // generator to emit next value:count pair
+    int index = 0;
+    auto gen = [&]() {
+      for (; index < static_cast<int>(counts.size()); ++index) {
+        if (counts[index] != 0) {
+          auto value_count =
+              std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+          ++index;
+          return value_count;
+        }
+      }
+      return std::make_pair<CType, int64_t>(0, -1);  // EOF
     };
 
-    // initialize min-heap with first n modes
-    std::vector<ValueCountPair> vector(n);
-    // push nan if exists
-    const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
-    if (has_nan) {
-      vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
-    }
-    // push n or n-1 modes
-    auto it = this->value_counts.cbegin();
-    for (int i = has_nan; i < n; ++i) {
-      vector[i] = *it++;
-    }
-    // turn to min-heap
-    std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
-        min_heap(std::move(mode_gt), std::move(vector));
-
-    // iterate and insert modes into min-heap
-    // - mode < heap top: ignore mode
-    // - mode > heap top: discard heap top, insert mode
-    for (; it != this->value_counts.cend(); ++it) {
-      if (mode_gt(*it, min_heap.top())) {
-        min_heap.pop();
-        min_heap.push(*it);
-      }
+    Finalize<T>(ctx, out, std::move(gen));
+  }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    int64_t counts[2]{};
+
+    const Datum& datum = batch[0];
+    for (const auto& array : datum.chunks()) {
+      if (array->length() > array->null_count()) {
+        const int64_t true_count =
+            checked_pointer_cast<BooleanArray>(array)->true_count();
+        const int64_t false_count = array->length() - array->null_count() - true_count;
+        counts[true] += true_count;
+        counts[false] += false_count;
+      };
     }
 
-    // pop modes from min-heap and insert into output array (in reverse order)
-    DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
-    for (int64_t i = n - 1; i >= 0; --i) {
-      std::tie(modes[i], counts[i]) = min_heap.top();
-      min_heap.pop();
+    const ModeOptions& options = ModeState::Get(ctx);
+    const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+    const int64_t n = std::min(options.n, distinct_values);
+
+    bool* mode_buffer;
+    int64_t* count_buffer;
+    KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                           PrepareOutput<BooleanType>(n, ctx, out));
+
+    if (n >= 1) {
+      const bool index = counts[1] > counts[0];
+      mode_buffer[0] = index;
+      count_buffer[0] = counts[index];
+      if (n == 2) {
+        mode_buffer[1] = !index;
+        count_buffer[1] = counts[!index];
+      }
     }
   }
+};
 
-  int64_t DistinctValues() const {
-    return this->value_counts.size() +
-           (is_floating_type<ArrowType>::value && this->nan_count > 0);
-  }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+  using CType = typename T::c_type;
+  using Allocator = arrow::stl::allocator<CType>;
 
-  int64_t nan_count = 0;  // only make sense to floating types
-  CounterMap<CType> value_counts;
-};
+  int64_t nan_count = 0;
 
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
-  using ThisType = ModeImpl<ArrowType>;
-  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
-  using CType = typename ArrowType::c_type;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // copy all chunks to a buffer, ignore nulls and nans
+    std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
 
-  ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : out_type(out_type), options(options) {}
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      in_buffer.resize(in_length);
 
-  void Consume(KernelContext*, const ExecBatch& batch) override {
-    ArrayType array(batch[0].array());
-    this->state.value_counts = CountValues(array, this->state.nan_count);
-  }
+      int64_t index = 0;
+      for (const auto& array : datum.chunks()) {
+        index += CopyArray(in_buffer.data() + index, *array);
+      }
 
-  void MergeFrom(KernelContext*, KernelState&& src) override {
-    auto& other = checked_cast<ThisType&>(src);
-    this->state.MergeFrom(std::move(other.state));
-  }
+      // drop nan
+      if (is_floating_type<T>::value) {
+        const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+                                        [](CType v) { return v != v; });
+        this->nan_count = in_buffer.end() - it;
+        in_buffer.resize(it - in_buffer.begin());
+      }
+    }
 
-  static std::shared_ptr<ArrayData> MakeArrayData(
-      const std::shared_ptr<DataType>& data_type, int64_t n) {
-    auto data = ArrayData::Make(data_type, n, 0);
-    data->buffers.resize(2);
-    data->buffers[0] = nullptr;
-    data->buffers[1] = nullptr;
-    return data;
+    // sort the input data to count same values
+    std::sort(in_buffer.begin(), in_buffer.end());
+
+    // generator to emit next value:count pair
+    auto it = in_buffer.cbegin();
+    int64_t nan_count_copy = this->nan_count;
+    auto gen = [&]() {
+      if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+        // handle NAN at last
+        if (nan_count_copy > 0) {
+          auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+          nan_count_copy = 0;
+          return value_count;
+        }
+        return std::make_pair<CType, int64_t>(0, -1);  // EOF
+      }
+      // count same values
+      const CType value = *it;
+      int64_t count = 0;
+      do {
+        ++it;
+        ++count;
+      } while (it != in_buffer.cend() && *it == value);
+      return std::make_pair(value, count);
+    };
+
+    Finalize<T>(ctx, out, std::move(gen));
   }
 
-  void Finalize(KernelContext* ctx, Datum* out) override {
-    const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
-    const auto& count_type = int64();
-    const auto& out_type =
-        struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
-    int64_t n = this->options.n;
-    if (n > state.DistinctValues()) {
-      n = state.DistinctValues();
-    } else if (n < 0) {
-      n = 0;
+  static int64_t CopyArray(CType* buffer, const Array& array) {
+    const int64_t n = array.length() - array.null_count();
+    if (n > 0) {
+      int64_t index = 0;
+      const ArrayData& data = *array.data();
+      const CType* values = data.GetValues<CType>(1);
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            memcpy(buffer + index, values + pos, len * sizeof(CType));
+                            index += len;
+                          });
+      DCHECK_EQ(index, n);
     }
+    return n;
+  }
+};
 
-    auto mode_data = this->MakeArrayData(mode_type, n);
-    auto count_data = this->MakeArrayData(count_type, n);
-    if (n > 0) {
-      KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(CType)));
-      KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(int64_t)));
-      CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
-      int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
-      this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+  using CType = typename T::c_type;
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // cross point to benefit from counting approach
+    // about 2x improvement for int32/64 from micro-benchmarking
+    static constexpr int kMinArraySize = 8192;
+    static constexpr int kMaxValueRange = 32768;
+
+    const Datum& datum = batch[0];
+    if (datum.length() - datum.null_count() >= kMinArraySize) {
+      CType min = std::numeric_limits<CType>::max();
+      CType max = std::numeric_limits<CType>::min();
+
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                min = std::min(min, values[pos + i]);
+                                max = std::max(max, values[pos + i]);
+                              }
+                            });
+      }
+
+      if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
+        CountModer<T>(min, max).Exec(ctx, batch, out);
+        return;
+      }
     }
 
-    *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
+    SortModer<T>().Exec(ctx, batch, out);
   }
+};
 
-  std::shared_ptr<DataType> out_type;
-  ModeState<ArrowType> state;
-  ModeOptions options;
+template <typename InType, typename Enable = void>
+struct Moder;
+
+template <>
+struct Moder<Int8Type> {
+  CountModer<Int8Type> impl;
+  Moder() : impl(-128, 127) {}
 };
 
-struct ModeInitState {
-  std::unique_ptr<KernelState> state;
-  KernelContext* ctx;
-  const DataType& in_type;
-  const std::shared_ptr<DataType>& out_type;
-  const ModeOptions& options;
+template <>
+struct Moder<UInt8Type> {
+  CountModer<UInt8Type> impl;
+  Moder() : impl(0, 255) {}
+};
 
-  ModeInitState(KernelContext* ctx, const DataType& in_type,
-                const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}
+template <>
+struct Moder<BooleanType> {
+  CountModer<BooleanType> impl;
+};
 
-  Status Visit(const DataType&) { return Status::NotImplemented("No mode implemented"); }
+template <typename InType>
+struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
+                                  (sizeof(typename InType::c_type) > 1))>> {
+  CountOrSortModer<InType> impl;
+};
 
-  Status Visit(const HalfFloatType&) {
-    return Status::NotImplemented("No mode implemented");
-  }
+template <typename InType>
+struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
+  SortModer<InType> impl;
+};
 
-  template <typename Type>
-  enable_if_t<is_number_type<Type>::value || is_boolean_type<Type>::value, Status> Visit(
-      const Type&) {
-    state.reset(new ModeImpl<Type>(out_type, options));
-    return Status::OK();
-  }
+template <typename _, typename InType>
+struct ModeExecutor {
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (ctx->state() == nullptr) {
+      ctx->SetStatus(Status::Invalid("Mode requires ModeOptions"));
+      return;
+    }
+    const ModeOptions& options = ModeState::Get(ctx);
+    if (options.n <= 0) {
+      ctx->SetStatus(Status::Invalid("ModeOption::n must be positive"));

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613724252



##########
File path: cpp/src/arrow/util/bit_run_reader.h
##########
@@ -480,8 +480,8 @@ Status VisitSetBitRuns(const uint8_t* bitmap, int64_t offset, int64_t length,
 }
 
 template <typename Visit>
-void VisitSetBitRunsVoid(const uint8_t* bitmap, int64_t offset, int64_t length,
-                         Visit&& visit) {
+inline void VisitSetBitRunsVoid(const uint8_t* bitmap, int64_t offset, int64_t length,

Review comment:
       Add `inline` hint.
   If the caller is from a cpp source, compiler is willing to inline. But if the caller is from a header file, compiler prefers non-inline, though in reality it doesn't increase binary size compared with called from source. Non-inline causes big perf drop as the visitor becomes a function call and cannot be optimized together with the loop.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-820516174


   I checked Github Actions and Travis-CI on my fork, will merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613727094



##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
 
 namespace {
 
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
 constexpr char kModeFieldName[] = "mode";
 constexpr char kCountFieldName[] = "count";
 
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array, int64_t& nan_count) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  nan_count = 0;
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             if (std::isnan(value)) {
-                                               ++nan_count;
-                                             } else {
-                                               ++value_counts_map[value];
-                                             }
-                                           }
-                                         });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+                                                  Datum* out) {
+  const auto& mode_type = TypeTraits<InType>::type_singleton();
+  const auto& count_type = int64();
+
+  auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+  mode_data->buffers.resize(2, nullptr);
+  auto count_data = ArrayData::Make(count_type, n, 0);
+  count_data->buffers.resize(2, nullptr);
+
+  CType* mode_buffer = nullptr;
+  int64_t* count_buffer = nullptr;
+
+  if (n > 0) {
+    ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+    ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+    mode_buffer = mode_data->template GetMutableValues<CType>(1);
+    count_buffer = count_data->template GetMutableValues<int64_t>(1);
   }
 
-  return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_map[values[pos + i]];
-                                           }
-                                         });
-  }
+  const auto& out_type =
+      struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+  *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
 
-  return value_counts_map;
+  return std::make_pair(mode_buffer, count_buffer);
 }
 
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
-  const int range = static_cast<int>(max - min);
-  DCHECK(range >= 0 && range < 64 * 1024 * 1024);
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  std::vector<int64_t> value_counts_vector(range + 1);
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_vector[values[pos + i] - min];
-                                           }
-                                         });
-  }
-
-  // Transfer value counts to a map to be consistent with other chunks
-  CounterMap<CType> value_counts_map(range + 1);
-  for (int i = 0; i <= range; ++i) {
-    CType value = static_cast<CType>(i + min);
-    int64_t count = value_counts_vector[i];
-    if (count) {
-      value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+  using CType = typename InType::c_type;
+
+  using ValueCountPair = std::pair<CType, int64_t>;
+  auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+    const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
+    return lhs.second > rhs.second ||
+           (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+  };
+
+  std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+      std::move(gt));
+
+  const ModeOptions& options = ModeState::Get(ctx);
+  while (true) {
+    const ValueCountPair& value_count = gen();
+    DCHECK_NE(value_count.second, 0);
+    if (value_count.second < 0) break;  // EOF reached
+    if (static_cast<int64_t>(min_heap.size()) < options.n) {
+      min_heap.push(value_count);
+    } else if (gt(value_count, min_heap.top())) {
+      min_heap.pop();
+      min_heap.push(value_count);
     }
   }
+  const int64_t n = min_heap.size();
 
-  return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
-  // see https://issues.apache.org/jira/browse/ARROW-9873
-  static constexpr int kMinArraySize = 8192 / sizeof(CType);
-  static constexpr int kMaxValueRange = 16384;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if ((array.length() - array.null_count()) >= kMinArraySize) {
-    CType min = std::numeric_limits<CType>::max();
-    CType max = std::numeric_limits<CType>::min();
-
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             min = std::min(min, value);
-                                             max = std::max(max, value);
-                                           }
-                                         });
-
-    if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
-      return CountValuesByVector(array, min, max);
-    }
-  }
-  return CountValuesByMap(array);
-}
+  CType* mode_buffer;
+  int64_t* count_buffer;
+  KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                         PrepareOutput<InType>(n, ctx, out));
 
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  // we need just count ones and zeros
-  CounterMap<CType> map;
-  if (array.length() > array.null_count()) {
-    map[true] = array.true_count();
-    map[false] = array.length() - array.null_count() - map[true];
+  for (int64_t i = n - 1; i >= 0; --i) {
+    std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+    min_heap.pop();
   }
-  nan_count = 0;
-  return map;
 }
 
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  using Limits = std::numeric_limits<CType>;
-  nan_count = 0;
-  return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+  using CType = typename T::c_type;
 
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMapOrVector(array);
-}
+  CType min;
+  std::vector<int64_t> counts;
 
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>>  // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMap(array, nan_count);
-}
+  CountModer(CType min, CType max) {
+    uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+    DCHECK_LT(value_range, 1 << 20);
+    this->min = min;
+    this->counts.resize(value_range, 0);
+  }
 
-template <typename ArrowType>
-struct ModeState {
-  using ThisType = ModeState<ArrowType>;
-  using CType = typename ArrowType::c_type;
-
-  void MergeFrom(ThisType&& state) {
-    if (this->value_counts.empty()) {
-      this->value_counts = std::move(state.value_counts);
-    } else {
-      for (const auto& value_count : state.value_counts) {
-        auto value = value_count.first;
-        auto count = value_count.second;
-        this->value_counts[value] += count;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // count values in all chunks, ignore nulls
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                ++this->counts[values[pos + i] - this->min];
+                              }
+                            });
       }
     }
-    if (is_floating_type<ArrowType>::value) {
-      this->nan_count += state.nan_count;
-    }
-  }
-
-  // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
-  void Finalize(CType* modes, int64_t* counts, const int64_t n) {
-    DCHECK(n >= 1 && n <= this->DistinctValues());
 
-    // mode 'greater than' comparator: larger count or same count with smaller value
-    using ValueCountPair = std::pair<CType, int64_t>;
-    auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
-      const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
-      return lhs.second > rhs.second ||
-             (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+    // generator to emit next value:count pair
+    int index = 0;
+    auto gen = [&]() {
+      for (; index < static_cast<int>(counts.size()); ++index) {
+        if (counts[index] != 0) {
+          auto value_count =
+              std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+          ++index;
+          return value_count;
+        }
+      }
+      return std::make_pair<CType, int64_t>(0, -1);  // EOF
     };
 
-    // initialize min-heap with first n modes
-    std::vector<ValueCountPair> vector(n);
-    // push nan if exists
-    const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
-    if (has_nan) {
-      vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
-    }
-    // push n or n-1 modes
-    auto it = this->value_counts.cbegin();
-    for (int i = has_nan; i < n; ++i) {
-      vector[i] = *it++;
-    }
-    // turn to min-heap
-    std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
-        min_heap(std::move(mode_gt), std::move(vector));
-
-    // iterate and insert modes into min-heap
-    // - mode < heap top: ignore mode
-    // - mode > heap top: discard heap top, insert mode
-    for (; it != this->value_counts.cend(); ++it) {
-      if (mode_gt(*it, min_heap.top())) {
-        min_heap.pop();
-        min_heap.push(*it);
-      }
+    Finalize<T>(ctx, out, std::move(gen));
+  }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    int64_t counts[2]{};
+
+    const Datum& datum = batch[0];
+    for (const auto& array : datum.chunks()) {
+      if (array->length() > array->null_count()) {
+        const int64_t true_count =
+            checked_pointer_cast<BooleanArray>(array)->true_count();
+        const int64_t false_count = array->length() - array->null_count() - true_count;
+        counts[true] += true_count;
+        counts[false] += false_count;
+      };
     }
 
-    // pop modes from min-heap and insert into output array (in reverse order)
-    DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
-    for (int64_t i = n - 1; i >= 0; --i) {
-      std::tie(modes[i], counts[i]) = min_heap.top();
-      min_heap.pop();
+    const ModeOptions& options = ModeState::Get(ctx);
+    const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+    const int64_t n = std::min(options.n, distinct_values);
+
+    bool* mode_buffer;
+    int64_t* count_buffer;
+    KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                           PrepareOutput<BooleanType>(n, ctx, out));
+
+    if (n >= 1) {
+      const bool index = counts[1] > counts[0];
+      mode_buffer[0] = index;
+      count_buffer[0] = counts[index];
+      if (n == 2) {
+        mode_buffer[1] = !index;
+        count_buffer[1] = counts[!index];
+      }
     }
   }
+};
 
-  int64_t DistinctValues() const {
-    return this->value_counts.size() +
-           (is_floating_type<ArrowType>::value && this->nan_count > 0);
-  }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+  using CType = typename T::c_type;
+  using Allocator = arrow::stl::allocator<CType>;
 
-  int64_t nan_count = 0;  // only make sense to floating types
-  CounterMap<CType> value_counts;
-};
+  int64_t nan_count = 0;
 
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
-  using ThisType = ModeImpl<ArrowType>;
-  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
-  using CType = typename ArrowType::c_type;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // copy all chunks to a buffer, ignore nulls and nans
+    std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
 
-  ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : out_type(out_type), options(options) {}
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      in_buffer.resize(in_length);
 
-  void Consume(KernelContext*, const ExecBatch& batch) override {
-    ArrayType array(batch[0].array());
-    this->state.value_counts = CountValues(array, this->state.nan_count);
-  }
+      int64_t index = 0;
+      for (const auto& array : datum.chunks()) {
+        index += CopyArray(in_buffer.data() + index, *array);
+      }
 
-  void MergeFrom(KernelContext*, KernelState&& src) override {
-    auto& other = checked_cast<ThisType&>(src);
-    this->state.MergeFrom(std::move(other.state));
-  }
+      // drop nan
+      if (is_floating_type<T>::value) {
+        const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+                                        [](CType v) { return v != v; });
+        this->nan_count = in_buffer.end() - it;
+        in_buffer.resize(it - in_buffer.begin());
+      }
+    }
 
-  static std::shared_ptr<ArrayData> MakeArrayData(
-      const std::shared_ptr<DataType>& data_type, int64_t n) {
-    auto data = ArrayData::Make(data_type, n, 0);
-    data->buffers.resize(2);
-    data->buffers[0] = nullptr;
-    data->buffers[1] = nullptr;
-    return data;
+    // sort the input data to count same values
+    std::sort(in_buffer.begin(), in_buffer.end());
+
+    // generator to emit next value:count pair
+    auto it = in_buffer.cbegin();
+    int64_t nan_count_copy = this->nan_count;
+    auto gen = [&]() {
+      if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+        // handle NAN at last
+        if (nan_count_copy > 0) {
+          auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+          nan_count_copy = 0;
+          return value_count;
+        }
+        return std::make_pair<CType, int64_t>(0, -1);  // EOF
+      }
+      // count same values
+      const CType value = *it;
+      int64_t count = 0;
+      do {
+        ++it;
+        ++count;
+      } while (it != in_buffer.cend() && *it == value);
+      return std::make_pair(value, count);
+    };
+
+    Finalize<T>(ctx, out, std::move(gen));
   }
 
-  void Finalize(KernelContext* ctx, Datum* out) override {
-    const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
-    const auto& count_type = int64();
-    const auto& out_type =
-        struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
-    int64_t n = this->options.n;
-    if (n > state.DistinctValues()) {
-      n = state.DistinctValues();
-    } else if (n < 0) {
-      n = 0;
+  static int64_t CopyArray(CType* buffer, const Array& array) {
+    const int64_t n = array.length() - array.null_count();
+    if (n > 0) {
+      int64_t index = 0;
+      const ArrayData& data = *array.data();
+      const CType* values = data.GetValues<CType>(1);
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            memcpy(buffer + index, values + pos, len * sizeof(CType));
+                            index += len;
+                          });
+      DCHECK_EQ(index, n);
     }
+    return n;
+  }
+};
 
-    auto mode_data = this->MakeArrayData(mode_type, n);
-    auto count_data = this->MakeArrayData(count_type, n);
-    if (n > 0) {
-      KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(CType)));
-      KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(int64_t)));
-      CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
-      int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
-      this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+  using CType = typename T::c_type;
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // cross point to benefit from counting approach
+    // about 2x improvement for int32/64 from micro-benchmarking
+    static constexpr int kMinArraySize = 8192;
+    static constexpr int kMaxValueRange = 32768;
+
+    const Datum& datum = batch[0];
+    if (datum.length() - datum.null_count() >= kMinArraySize) {
+      CType min = std::numeric_limits<CType>::max();
+      CType max = std::numeric_limits<CType>::min();
+
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                min = std::min(min, values[pos + i]);
+                                max = std::max(max, values[pos + i]);
+                              }
+                            });

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818871509


   Do the existing tests exercise both the narrow and wide cases?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612599321



##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
 
 namespace {
 
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
 constexpr char kModeFieldName[] = "mode";
 constexpr char kCountFieldName[] = "count";
 
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array, int64_t& nan_count) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  nan_count = 0;
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             if (std::isnan(value)) {
-                                               ++nan_count;
-                                             } else {
-                                               ++value_counts_map[value];
-                                             }
-                                           }
-                                         });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+                                                  Datum* out) {
+  const auto& mode_type = TypeTraits<InType>::type_singleton();
+  const auto& count_type = int64();
+
+  auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+  mode_data->buffers.resize(2, nullptr);
+  auto count_data = ArrayData::Make(count_type, n, 0);
+  count_data->buffers.resize(2, nullptr);
+
+  CType* mode_buffer = nullptr;
+  int64_t* count_buffer = nullptr;
+
+  if (n > 0) {
+    ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+    ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+    mode_buffer = mode_data->template GetMutableValues<CType>(1);
+    count_buffer = count_data->template GetMutableValues<int64_t>(1);
   }
 
-  return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_map[values[pos + i]];
-                                           }
-                                         });
-  }
+  const auto& out_type =
+      struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+  *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
 
-  return value_counts_map;
+  return std::make_pair(mode_buffer, count_buffer);
 }
 
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
-  const int range = static_cast<int>(max - min);
-  DCHECK(range >= 0 && range < 64 * 1024 * 1024);
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  std::vector<int64_t> value_counts_vector(range + 1);
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_vector[values[pos + i] - min];
-                                           }
-                                         });
-  }
-
-  // Transfer value counts to a map to be consistent with other chunks
-  CounterMap<CType> value_counts_map(range + 1);
-  for (int i = 0; i <= range; ++i) {
-    CType value = static_cast<CType>(i + min);
-    int64_t count = value_counts_vector[i];
-    if (count) {
-      value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+  using CType = typename InType::c_type;
+
+  using ValueCountPair = std::pair<CType, int64_t>;
+  auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+    const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
+    return lhs.second > rhs.second ||
+           (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+  };
+
+  std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+      std::move(gt));
+
+  const ModeOptions& options = ModeState::Get(ctx);
+  while (true) {
+    const ValueCountPair& value_count = gen();
+    DCHECK_NE(value_count.second, 0);
+    if (value_count.second < 0) break;  // EOF reached
+    if (static_cast<int64_t>(min_heap.size()) < options.n) {
+      min_heap.push(value_count);
+    } else if (gt(value_count, min_heap.top())) {
+      min_heap.pop();
+      min_heap.push(value_count);
     }
   }
+  const int64_t n = min_heap.size();
 
-  return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
-  // see https://issues.apache.org/jira/browse/ARROW-9873
-  static constexpr int kMinArraySize = 8192 / sizeof(CType);
-  static constexpr int kMaxValueRange = 16384;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if ((array.length() - array.null_count()) >= kMinArraySize) {
-    CType min = std::numeric_limits<CType>::max();
-    CType max = std::numeric_limits<CType>::min();
-
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             min = std::min(min, value);
-                                             max = std::max(max, value);
-                                           }
-                                         });
-
-    if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
-      return CountValuesByVector(array, min, max);
-    }
-  }
-  return CountValuesByMap(array);
-}
+  CType* mode_buffer;
+  int64_t* count_buffer;
+  KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                         PrepareOutput<InType>(n, ctx, out));
 
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  // we need just count ones and zeros
-  CounterMap<CType> map;
-  if (array.length() > array.null_count()) {
-    map[true] = array.true_count();
-    map[false] = array.length() - array.null_count() - map[true];
+  for (int64_t i = n - 1; i >= 0; --i) {
+    std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+    min_heap.pop();
   }
-  nan_count = 0;
-  return map;
 }
 
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  using Limits = std::numeric_limits<CType>;
-  nan_count = 0;
-  return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+  using CType = typename T::c_type;
 
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMapOrVector(array);
-}
+  CType min;
+  std::vector<int64_t> counts;
 
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>>  // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMap(array, nan_count);
-}
+  CountModer(CType min, CType max) {
+    uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+    DCHECK_LT(value_range, 1 << 20);
+    this->min = min;
+    this->counts.resize(value_range, 0);
+  }
 
-template <typename ArrowType>
-struct ModeState {
-  using ThisType = ModeState<ArrowType>;
-  using CType = typename ArrowType::c_type;
-
-  void MergeFrom(ThisType&& state) {
-    if (this->value_counts.empty()) {
-      this->value_counts = std::move(state.value_counts);
-    } else {
-      for (const auto& value_count : state.value_counts) {
-        auto value = value_count.first;
-        auto count = value_count.second;
-        this->value_counts[value] += count;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // count values in all chunks, ignore nulls
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                ++this->counts[values[pos + i] - this->min];
+                              }
+                            });
       }
     }
-    if (is_floating_type<ArrowType>::value) {
-      this->nan_count += state.nan_count;
-    }
-  }
-
-  // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
-  void Finalize(CType* modes, int64_t* counts, const int64_t n) {
-    DCHECK(n >= 1 && n <= this->DistinctValues());
 
-    // mode 'greater than' comparator: larger count or same count with smaller value
-    using ValueCountPair = std::pair<CType, int64_t>;
-    auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
-      const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
-      return lhs.second > rhs.second ||
-             (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+    // generator to emit next value:count pair
+    int index = 0;
+    auto gen = [&]() {
+      for (; index < static_cast<int>(counts.size()); ++index) {
+        if (counts[index] != 0) {
+          auto value_count =
+              std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+          ++index;
+          return value_count;
+        }
+      }
+      return std::make_pair<CType, int64_t>(0, -1);  // EOF
     };
 
-    // initialize min-heap with first n modes
-    std::vector<ValueCountPair> vector(n);
-    // push nan if exists
-    const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
-    if (has_nan) {
-      vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
-    }
-    // push n or n-1 modes
-    auto it = this->value_counts.cbegin();
-    for (int i = has_nan; i < n; ++i) {
-      vector[i] = *it++;
-    }
-    // turn to min-heap
-    std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
-        min_heap(std::move(mode_gt), std::move(vector));
-
-    // iterate and insert modes into min-heap
-    // - mode < heap top: ignore mode
-    // - mode > heap top: discard heap top, insert mode
-    for (; it != this->value_counts.cend(); ++it) {
-      if (mode_gt(*it, min_heap.top())) {
-        min_heap.pop();
-        min_heap.push(*it);
-      }
+    Finalize<T>(ctx, out, std::move(gen));
+  }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    int64_t counts[2]{};
+
+    const Datum& datum = batch[0];
+    for (const auto& array : datum.chunks()) {
+      if (array->length() > array->null_count()) {
+        const int64_t true_count =
+            checked_pointer_cast<BooleanArray>(array)->true_count();
+        const int64_t false_count = array->length() - array->null_count() - true_count;
+        counts[true] += true_count;
+        counts[false] += false_count;
+      };
     }
 
-    // pop modes from min-heap and insert into output array (in reverse order)
-    DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
-    for (int64_t i = n - 1; i >= 0; --i) {
-      std::tie(modes[i], counts[i]) = min_heap.top();
-      min_heap.pop();
+    const ModeOptions& options = ModeState::Get(ctx);
+    const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+    const int64_t n = std::min(options.n, distinct_values);
+
+    bool* mode_buffer;
+    int64_t* count_buffer;
+    KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                           PrepareOutput<BooleanType>(n, ctx, out));
+
+    if (n >= 1) {
+      const bool index = counts[1] > counts[0];
+      mode_buffer[0] = index;
+      count_buffer[0] = counts[index];
+      if (n == 2) {
+        mode_buffer[1] = !index;
+        count_buffer[1] = counts[!index];
+      }
     }
   }
+};
 
-  int64_t DistinctValues() const {
-    return this->value_counts.size() +
-           (is_floating_type<ArrowType>::value && this->nan_count > 0);
-  }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+  using CType = typename T::c_type;
+  using Allocator = arrow::stl::allocator<CType>;
 
-  int64_t nan_count = 0;  // only make sense to floating types
-  CounterMap<CType> value_counts;
-};
+  int64_t nan_count = 0;
 
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
-  using ThisType = ModeImpl<ArrowType>;
-  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
-  using CType = typename ArrowType::c_type;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // copy all chunks to a buffer, ignore nulls and nans
+    std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
 
-  ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : out_type(out_type), options(options) {}
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      in_buffer.resize(in_length);
 
-  void Consume(KernelContext*, const ExecBatch& batch) override {
-    ArrayType array(batch[0].array());
-    this->state.value_counts = CountValues(array, this->state.nan_count);
-  }
+      int64_t index = 0;
+      for (const auto& array : datum.chunks()) {
+        index += CopyArray(in_buffer.data() + index, *array);
+      }
 
-  void MergeFrom(KernelContext*, KernelState&& src) override {
-    auto& other = checked_cast<ThisType&>(src);
-    this->state.MergeFrom(std::move(other.state));
-  }
+      // drop nan
+      if (is_floating_type<T>::value) {
+        const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+                                        [](CType v) { return v != v; });
+        this->nan_count = in_buffer.end() - it;
+        in_buffer.resize(it - in_buffer.begin());
+      }
+    }
 
-  static std::shared_ptr<ArrayData> MakeArrayData(
-      const std::shared_ptr<DataType>& data_type, int64_t n) {
-    auto data = ArrayData::Make(data_type, n, 0);
-    data->buffers.resize(2);
-    data->buffers[0] = nullptr;
-    data->buffers[1] = nullptr;
-    return data;
+    // sort the input data to count same values
+    std::sort(in_buffer.begin(), in_buffer.end());
+
+    // generator to emit next value:count pair
+    auto it = in_buffer.cbegin();
+    int64_t nan_count_copy = this->nan_count;
+    auto gen = [&]() {
+      if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+        // handle NAN at last
+        if (nan_count_copy > 0) {
+          auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+          nan_count_copy = 0;
+          return value_count;
+        }
+        return std::make_pair<CType, int64_t>(0, -1);  // EOF
+      }
+      // count same values
+      const CType value = *it;
+      int64_t count = 0;
+      do {
+        ++it;
+        ++count;
+      } while (it != in_buffer.cend() && *it == value);
+      return std::make_pair(value, count);
+    };
+
+    Finalize<T>(ctx, out, std::move(gen));
   }
 
-  void Finalize(KernelContext* ctx, Datum* out) override {
-    const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
-    const auto& count_type = int64();
-    const auto& out_type =
-        struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
-    int64_t n = this->options.n;
-    if (n > state.DistinctValues()) {
-      n = state.DistinctValues();
-    } else if (n < 0) {
-      n = 0;
+  static int64_t CopyArray(CType* buffer, const Array& array) {
+    const int64_t n = array.length() - array.null_count();
+    if (n > 0) {
+      int64_t index = 0;
+      const ArrayData& data = *array.data();
+      const CType* values = data.GetValues<CType>(1);
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            memcpy(buffer + index, values + pos, len * sizeof(CType));
+                            index += len;
+                          });
+      DCHECK_EQ(index, n);
     }
+    return n;
+  }
+};
 
-    auto mode_data = this->MakeArrayData(mode_type, n);
-    auto count_data = this->MakeArrayData(count_type, n);
-    if (n > 0) {
-      KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(CType)));
-      KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(int64_t)));
-      CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
-      int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
-      this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+  using CType = typename T::c_type;
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // cross point to benefit from counting approach
+    // about 2x improvement for int32/64 from micro-benchmarking
+    static constexpr int kMinArraySize = 8192;
+    static constexpr int kMaxValueRange = 32768;
+
+    const Datum& datum = batch[0];
+    if (datum.length() - datum.null_count() >= kMinArraySize) {
+      CType min = std::numeric_limits<CType>::max();
+      CType max = std::numeric_limits<CType>::min();
+
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                min = std::min(min, values[pos + i]);
+                                max = std::max(max, values[pos + i]);
+                              }
+                            });

Review comment:
       It looks like this operation (compute the minmax of a Datum or ArrayData) is common enough to be factored out in an internal header file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612950895



##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
 
 namespace {
 
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
 constexpr char kModeFieldName[] = "mode";
 constexpr char kCountFieldName[] = "count";
 
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array, int64_t& nan_count) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  nan_count = 0;
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             if (std::isnan(value)) {
-                                               ++nan_count;
-                                             } else {
-                                               ++value_counts_map[value];
-                                             }
-                                           }
-                                         });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+                                                  Datum* out) {
+  const auto& mode_type = TypeTraits<InType>::type_singleton();
+  const auto& count_type = int64();
+
+  auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+  mode_data->buffers.resize(2, nullptr);
+  auto count_data = ArrayData::Make(count_type, n, 0);
+  count_data->buffers.resize(2, nullptr);
+
+  CType* mode_buffer = nullptr;
+  int64_t* count_buffer = nullptr;
+
+  if (n > 0) {
+    ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+    ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+    mode_buffer = mode_data->template GetMutableValues<CType>(1);
+    count_buffer = count_data->template GetMutableValues<int64_t>(1);
   }
 
-  return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_map[values[pos + i]];
-                                           }
-                                         });
-  }
+  const auto& out_type =
+      struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+  *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
 
-  return value_counts_map;
+  return std::make_pair(mode_buffer, count_buffer);
 }
 
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
-  const int range = static_cast<int>(max - min);
-  DCHECK(range >= 0 && range < 64 * 1024 * 1024);
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  std::vector<int64_t> value_counts_vector(range + 1);
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_vector[values[pos + i] - min];
-                                           }
-                                         });
-  }
-
-  // Transfer value counts to a map to be consistent with other chunks
-  CounterMap<CType> value_counts_map(range + 1);
-  for (int i = 0; i <= range; ++i) {
-    CType value = static_cast<CType>(i + min);
-    int64_t count = value_counts_vector[i];
-    if (count) {
-      value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+  using CType = typename InType::c_type;
+
+  using ValueCountPair = std::pair<CType, int64_t>;
+  auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+    const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
+    return lhs.second > rhs.second ||
+           (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+  };
+
+  std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+      std::move(gt));
+
+  const ModeOptions& options = ModeState::Get(ctx);
+  while (true) {
+    const ValueCountPair& value_count = gen();
+    DCHECK_NE(value_count.second, 0);
+    if (value_count.second < 0) break;  // EOF reached
+    if (static_cast<int64_t>(min_heap.size()) < options.n) {
+      min_heap.push(value_count);
+    } else if (gt(value_count, min_heap.top())) {
+      min_heap.pop();
+      min_heap.push(value_count);
     }
   }
+  const int64_t n = min_heap.size();
 
-  return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
-  // see https://issues.apache.org/jira/browse/ARROW-9873
-  static constexpr int kMinArraySize = 8192 / sizeof(CType);
-  static constexpr int kMaxValueRange = 16384;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if ((array.length() - array.null_count()) >= kMinArraySize) {
-    CType min = std::numeric_limits<CType>::max();
-    CType max = std::numeric_limits<CType>::min();
-
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             min = std::min(min, value);
-                                             max = std::max(max, value);
-                                           }
-                                         });
-
-    if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
-      return CountValuesByVector(array, min, max);
-    }
-  }
-  return CountValuesByMap(array);
-}
+  CType* mode_buffer;
+  int64_t* count_buffer;
+  KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                         PrepareOutput<InType>(n, ctx, out));
 
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  // we need just count ones and zeros
-  CounterMap<CType> map;
-  if (array.length() > array.null_count()) {
-    map[true] = array.true_count();
-    map[false] = array.length() - array.null_count() - map[true];
+  for (int64_t i = n - 1; i >= 0; --i) {
+    std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+    min_heap.pop();
   }
-  nan_count = 0;
-  return map;
 }
 
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  using Limits = std::numeric_limits<CType>;
-  nan_count = 0;
-  return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+  using CType = typename T::c_type;
 
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMapOrVector(array);
-}
+  CType min;
+  std::vector<int64_t> counts;
 
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>>  // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMap(array, nan_count);
-}
+  CountModer(CType min, CType max) {
+    uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+    DCHECK_LT(value_range, 1 << 20);
+    this->min = min;
+    this->counts.resize(value_range, 0);
+  }
 
-template <typename ArrowType>
-struct ModeState {
-  using ThisType = ModeState<ArrowType>;
-  using CType = typename ArrowType::c_type;
-
-  void MergeFrom(ThisType&& state) {
-    if (this->value_counts.empty()) {
-      this->value_counts = std::move(state.value_counts);
-    } else {
-      for (const auto& value_count : state.value_counts) {
-        auto value = value_count.first;
-        auto count = value_count.second;
-        this->value_counts[value] += count;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // count values in all chunks, ignore nulls
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                ++this->counts[values[pos + i] - this->min];
+                              }
+                            });
       }
     }
-    if (is_floating_type<ArrowType>::value) {
-      this->nan_count += state.nan_count;
-    }
-  }
-
-  // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
-  void Finalize(CType* modes, int64_t* counts, const int64_t n) {
-    DCHECK(n >= 1 && n <= this->DistinctValues());
 
-    // mode 'greater than' comparator: larger count or same count with smaller value
-    using ValueCountPair = std::pair<CType, int64_t>;
-    auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
-      const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
-      return lhs.second > rhs.second ||
-             (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+    // generator to emit next value:count pair
+    int index = 0;
+    auto gen = [&]() {
+      for (; index < static_cast<int>(counts.size()); ++index) {
+        if (counts[index] != 0) {
+          auto value_count =
+              std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+          ++index;
+          return value_count;
+        }
+      }
+      return std::make_pair<CType, int64_t>(0, -1);  // EOF
     };
 
-    // initialize min-heap with first n modes
-    std::vector<ValueCountPair> vector(n);
-    // push nan if exists
-    const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
-    if (has_nan) {
-      vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
-    }
-    // push n or n-1 modes
-    auto it = this->value_counts.cbegin();
-    for (int i = has_nan; i < n; ++i) {
-      vector[i] = *it++;
-    }
-    // turn to min-heap
-    std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
-        min_heap(std::move(mode_gt), std::move(vector));
-
-    // iterate and insert modes into min-heap
-    // - mode < heap top: ignore mode
-    // - mode > heap top: discard heap top, insert mode
-    for (; it != this->value_counts.cend(); ++it) {
-      if (mode_gt(*it, min_heap.top())) {
-        min_heap.pop();
-        min_heap.push(*it);
-      }
+    Finalize<T>(ctx, out, std::move(gen));
+  }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    int64_t counts[2]{};
+
+    const Datum& datum = batch[0];
+    for (const auto& array : datum.chunks()) {
+      if (array->length() > array->null_count()) {
+        const int64_t true_count =
+            checked_pointer_cast<BooleanArray>(array)->true_count();
+        const int64_t false_count = array->length() - array->null_count() - true_count;
+        counts[true] += true_count;
+        counts[false] += false_count;
+      };
     }
 
-    // pop modes from min-heap and insert into output array (in reverse order)
-    DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
-    for (int64_t i = n - 1; i >= 0; --i) {
-      std::tie(modes[i], counts[i]) = min_heap.top();
-      min_heap.pop();
+    const ModeOptions& options = ModeState::Get(ctx);
+    const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+    const int64_t n = std::min(options.n, distinct_values);
+
+    bool* mode_buffer;
+    int64_t* count_buffer;
+    KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                           PrepareOutput<BooleanType>(n, ctx, out));
+
+    if (n >= 1) {
+      const bool index = counts[1] > counts[0];
+      mode_buffer[0] = index;
+      count_buffer[0] = counts[index];
+      if (n == 2) {
+        mode_buffer[1] = !index;
+        count_buffer[1] = counts[!index];
+      }
     }
   }
+};
 
-  int64_t DistinctValues() const {
-    return this->value_counts.size() +
-           (is_floating_type<ArrowType>::value && this->nan_count > 0);
-  }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+  using CType = typename T::c_type;
+  using Allocator = arrow::stl::allocator<CType>;
 
-  int64_t nan_count = 0;  // only make sense to floating types
-  CounterMap<CType> value_counts;
-};
+  int64_t nan_count = 0;
 
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
-  using ThisType = ModeImpl<ArrowType>;
-  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
-  using CType = typename ArrowType::c_type;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // copy all chunks to a buffer, ignore nulls and nans
+    std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
 
-  ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : out_type(out_type), options(options) {}
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      in_buffer.resize(in_length);
 
-  void Consume(KernelContext*, const ExecBatch& batch) override {
-    ArrayType array(batch[0].array());
-    this->state.value_counts = CountValues(array, this->state.nan_count);
-  }
+      int64_t index = 0;
+      for (const auto& array : datum.chunks()) {
+        index += CopyArray(in_buffer.data() + index, *array);
+      }
 
-  void MergeFrom(KernelContext*, KernelState&& src) override {
-    auto& other = checked_cast<ThisType&>(src);
-    this->state.MergeFrom(std::move(other.state));
-  }
+      // drop nan
+      if (is_floating_type<T>::value) {
+        const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+                                        [](CType v) { return v != v; });
+        this->nan_count = in_buffer.end() - it;
+        in_buffer.resize(it - in_buffer.begin());
+      }
+    }
 
-  static std::shared_ptr<ArrayData> MakeArrayData(
-      const std::shared_ptr<DataType>& data_type, int64_t n) {
-    auto data = ArrayData::Make(data_type, n, 0);
-    data->buffers.resize(2);
-    data->buffers[0] = nullptr;
-    data->buffers[1] = nullptr;
-    return data;
+    // sort the input data to count same values
+    std::sort(in_buffer.begin(), in_buffer.end());
+
+    // generator to emit next value:count pair
+    auto it = in_buffer.cbegin();
+    int64_t nan_count_copy = this->nan_count;
+    auto gen = [&]() {
+      if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+        // handle NAN at last
+        if (nan_count_copy > 0) {
+          auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+          nan_count_copy = 0;
+          return value_count;
+        }
+        return std::make_pair<CType, int64_t>(0, -1);  // EOF
+      }
+      // count same values
+      const CType value = *it;
+      int64_t count = 0;
+      do {
+        ++it;
+        ++count;
+      } while (it != in_buffer.cend() && *it == value);
+      return std::make_pair(value, count);
+    };
+
+    Finalize<T>(ctx, out, std::move(gen));
   }
 
-  void Finalize(KernelContext* ctx, Datum* out) override {
-    const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
-    const auto& count_type = int64();
-    const auto& out_type =
-        struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
-    int64_t n = this->options.n;
-    if (n > state.DistinctValues()) {
-      n = state.DistinctValues();
-    } else if (n < 0) {
-      n = 0;
+  static int64_t CopyArray(CType* buffer, const Array& array) {
+    const int64_t n = array.length() - array.null_count();
+    if (n > 0) {
+      int64_t index = 0;
+      const ArrayData& data = *array.data();
+      const CType* values = data.GetValues<CType>(1);
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            memcpy(buffer + index, values + pos, len * sizeof(CType));
+                            index += len;
+                          });
+      DCHECK_EQ(index, n);
     }
+    return n;
+  }
+};
 
-    auto mode_data = this->MakeArrayData(mode_type, n);
-    auto count_data = this->MakeArrayData(count_type, n);
-    if (n > 0) {
-      KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(CType)));
-      KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(int64_t)));
-      CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
-      int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
-      this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+  using CType = typename T::c_type;
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // cross point to benefit from counting approach
+    // about 2x improvement for int32/64 from micro-benchmarking
+    static constexpr int kMinArraySize = 8192;
+    static constexpr int kMaxValueRange = 32768;
+
+    const Datum& datum = batch[0];
+    if (datum.length() - datum.null_count() >= kMinArraySize) {
+      CType min = std::numeric_limits<CType>::max();
+      CType max = std::numeric_limits<CType>::min();
+
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                min = std::min(min, values[pos + i]);
+                                max = std::max(max, values[pos + i]);
+                              }
+                            });

Review comment:
       Will do.
   Quantile and Mode kernels share many common code. Will also factor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818855204


   Benchmarks on Zen 2, clang 10.
   ```
   ----------------------------------------------------------------------------------------------
   Non-regressions: (44)
   ----------------------------------------------------------------------------------------------
                                     benchmark        baseline       contender  change % counters
     ModeKernelNarrow<Int64Type>/1048576/10000   1.344 GiB/sec   3.077 GiB/sec   129.033       {}
           ModeKernelWide<FloatType>/1048576/0  26.692 MiB/sec  60.554 MiB/sec   126.860       {}
         ModeKernelWide<FloatType>/1048576/100  26.720 MiB/sec  60.031 MiB/sec   124.667       {}
       ModeKernelWide<FloatType>/1048576/10000  26.707 MiB/sec  59.770 MiB/sec   123.799       {}
           ModeKernelWide<Int32Type>/1048576/0  35.031 MiB/sec  77.918 MiB/sec   122.426       {}
         ModeKernelNarrow<Int64Type>/1048576/0   1.399 GiB/sec   3.091 GiB/sec   120.934       {}
       ModeKernelWide<Int32Type>/1048576/10000  34.942 MiB/sec  77.128 MiB/sec   120.729       {}
        ModeKernelWide<DoubleType>/1048576/100  57.929 MiB/sec 127.702 MiB/sec   120.447       {}
       ModeKernelNarrow<Int64Type>/1048576/100   1.328 GiB/sec   2.874 GiB/sec   116.482       {}
      ModeKernelWide<DoubleType>/1048576/10000  58.316 MiB/sec 125.540 MiB/sec   115.276       {}
          ModeKernelWide<DoubleType>/1048576/0  58.070 MiB/sec 122.623 MiB/sec   111.164       {}
           ModeKernelWide<FloatType>/1048576/2  55.623 MiB/sec 117.360 MiB/sec   110.994       {}
          ModeKernelWide<DoubleType>/1048576/2 116.593 MiB/sec 243.300 MiB/sec   108.675       {}
         ModeKernelWide<Int64Type>/1048576/100  79.077 MiB/sec 162.463 MiB/sec   105.449       {}
       ModeKernelWide<Int64Type>/1048576/10000  78.890 MiB/sec 158.923 MiB/sec   101.450       {}
        ModeKernelNarrow<Int64Type>/1048576/10 994.277 MiB/sec   1.895 GiB/sec    95.196       {}
           ModeKernelWide<Int32Type>/1048576/2  70.771 MiB/sec 136.449 MiB/sec    92.804       {}
           ModeKernelWide<Int64Type>/1048576/0  79.140 MiB/sec 150.926 MiB/sec    90.709       {}
         ModeKernelWide<DoubleType>/1048576/10  72.236 MiB/sec 136.781 MiB/sec    89.354       {}
           ModeKernelWide<Int64Type>/1048576/2 149.834 MiB/sec 282.411 MiB/sec    88.482       {}
          ModeKernelWide<FloatType>/1048576/10  34.595 MiB/sec  65.197 MiB/sec    88.457       {}
          ModeKernelWide<Int32Type>/1048576/10  44.789 MiB/sec  82.908 MiB/sec    85.107       {}
         ModeKernelWide<Int32Type>/1048576/100  41.497 MiB/sec  76.746 MiB/sec    84.946       {}
         ModeKernelNarrow<Int64Type>/1048576/2 799.410 MiB/sec   1.407 GiB/sec    80.226       {}
          ModeKernelWide<Int64Type>/1048576/10  94.448 MiB/sec 168.857 MiB/sec    78.782       {}
         ModeKernelNarrow<Int32Type>/1048576/0   1.099 GiB/sec   1.709 GiB/sec    55.441       {}
        ModeKernelNarrow<Int32Type>/1048576/10 700.749 MiB/sec   1.059 GiB/sec    54.802       {}
         ModeKernelNarrow<Int32Type>/1048576/2 500.474 MiB/sec 727.773 MiB/sec    45.417       {}
       ModeKernelNarrow<Int32Type>/1048576/100   1.083 GiB/sec   1.529 GiB/sec    41.151       {}
     ModeKernelNarrow<Int32Type>/1048576/10000   1.258 GiB/sec   1.658 GiB/sec    31.790       {}
          ModeKernelNarrow<Int8Type>/1048576/1 504.487 GiB/sec 625.852 GiB/sec    24.057       {}
           ModeKernelWide<Int32Type>/1048576/1 584.030 GiB/sec 704.471 GiB/sec    20.622       {}
         ModeKernelNarrow<Int32Type>/1048576/1 595.824 GiB/sec 712.818 GiB/sec    19.636       {}
         ModeKernelNarrow<Int64Type>/1048576/1 597.643 GiB/sec 707.585 GiB/sec    18.396       {}
           ModeKernelWide<FloatType>/1048576/1 582.116 GiB/sec 679.811 GiB/sec    16.783       {}
          ModeKernelWide<DoubleType>/1048576/1 583.923 GiB/sec 674.269 GiB/sec    15.472       {}
           ModeKernelWide<Int64Type>/1048576/1 590.544 GiB/sec 678.023 GiB/sec    14.813       {}
       ModeKernelNarrow<BooleanType>/1048576/1 593.884 GiB/sec 648.047 GiB/sec     9.120       {}
       ModeKernelNarrow<BooleanType>/1048576/0  36.364 GiB/sec  38.774 GiB/sec     6.626       {}
          ModeKernelNarrow<Int8Type>/1048576/2 366.079 MiB/sec 378.478 MiB/sec     3.387       {}
      ModeKernelNarrow<BooleanType>/1048576/10   2.393 GiB/sec   2.412 GiB/sec     0.784       {}
   ModeKernelNarrow<BooleanType>/1048576/10000   2.392 GiB/sec   2.395 GiB/sec     0.115       {}
       ModeKernelNarrow<BooleanType>/1048576/2   2.393 GiB/sec   2.391 GiB/sec    -0.071       {}
     ModeKernelNarrow<BooleanType>/1048576/100   2.395 GiB/sec   2.369 GiB/sec    -1.084       {}
   
   -------------------------------------------------------------------------------------------
   Regressions: (4)
   -------------------------------------------------------------------------------------------
                                  benchmark        baseline       contender  change % counters
       ModeKernelNarrow<Int8Type>/1048576/0 517.607 MiB/sec 477.913 MiB/sec    -7.669       {}
   ModeKernelNarrow<Int8Type>/1048576/10000 597.905 MiB/sec 473.417 MiB/sec   -20.821       {}
      ModeKernelNarrow<Int8Type>/1048576/10 616.529 MiB/sec 486.274 MiB/sec   -21.127       {}
     ModeKernelNarrow<Int8Type>/1048576/100 652.528 MiB/sec 478.726 MiB/sec   -26.635       {}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613725348



##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -492,16 +493,8 @@ class ArrayCountOrCompareSorter {
   uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
                  int64_t offset, const ArraySortOptions& options) {
     if (values.length() >= countsort_min_len_ && values.length() > values.null_count()) {
-      c_type min{std::numeric_limits<c_type>::max()};
-      c_type max{std::numeric_limits<c_type>::min()};
-
-      VisitRawValuesInline(
-          values,
-          [&](c_type v) {
-            min = std::min(min, v);
-            max = std::max(max, v);
-          },
-          []() {});
+      c_type min, max;
+      std::tie(min, max) = GetMinMax<c_type>(*values.data());

Review comment:
       A bit performance improvement for int64narrow sorting.
   ```
                              benchmark            baseline           contender  change %
    ArraySortIndicesInt64Narrow/32768/2     507.257 MiB/sec     632.995 MiB/sec    24.788
   ArraySortIndicesInt64Narrow/32768/10     643.182 MiB/sec     724.483 MiB/sec    12.640
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818629589


   https://issues.apache.org/jira/browse/ARROW-11568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613725348



##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -492,16 +493,8 @@ class ArrayCountOrCompareSorter {
   uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
                  int64_t offset, const ArraySortOptions& options) {
     if (values.length() >= countsort_min_len_ && values.length() > values.null_count()) {
-      c_type min{std::numeric_limits<c_type>::max()};
-      c_type max{std::numeric_limits<c_type>::min()};
-
-      VisitRawValuesInline(
-          values,
-          [&](c_type v) {
-            min = std::min(min, v);
-            max = std::max(max, v);
-          },
-          []() {});
+      c_type min, max;
+      std::tie(min, max) = GetMinMax<c_type>(*values.data());

Review comment:
       Small performance improvement for int64narrow sorting.
   ```
                              benchmark            baseline           contender  change %
    ArraySortIndicesInt64Narrow/32768/2     507.257 MiB/sec     632.995 MiB/sec    24.788
   ArraySortIndicesInt64Narrow/32768/10     643.182 MiB/sec     724.483 MiB/sec    12.640
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612600080



##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
 
 namespace {
 
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
 constexpr char kModeFieldName[] = "mode";
 constexpr char kCountFieldName[] = "count";
 
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array, int64_t& nan_count) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  nan_count = 0;
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             if (std::isnan(value)) {
-                                               ++nan_count;
-                                             } else {
-                                               ++value_counts_map[value];
-                                             }
-                                           }
-                                         });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+                                                  Datum* out) {
+  const auto& mode_type = TypeTraits<InType>::type_singleton();
+  const auto& count_type = int64();
+
+  auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+  mode_data->buffers.resize(2, nullptr);
+  auto count_data = ArrayData::Make(count_type, n, 0);
+  count_data->buffers.resize(2, nullptr);
+
+  CType* mode_buffer = nullptr;
+  int64_t* count_buffer = nullptr;
+
+  if (n > 0) {
+    ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+    ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+    mode_buffer = mode_data->template GetMutableValues<CType>(1);
+    count_buffer = count_data->template GetMutableValues<int64_t>(1);
   }
 
-  return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
-    const ArrayType& array) {
-  CounterMap<CType> value_counts_map;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_map[values[pos + i]];
-                                           }
-                                         });
-  }
+  const auto& out_type =
+      struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+  *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
 
-  return value_counts_map;
+  return std::make_pair(mode_buffer, count_buffer);
 }
 
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
-  const int range = static_cast<int>(max - min);
-  DCHECK(range >= 0 && range < 64 * 1024 * 1024);
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  std::vector<int64_t> value_counts_vector(range + 1);
-  if (array.length() > array.null_count()) {
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             ++value_counts_vector[values[pos + i] - min];
-                                           }
-                                         });
-  }
-
-  // Transfer value counts to a map to be consistent with other chunks
-  CounterMap<CType> value_counts_map(range + 1);
-  for (int i = 0; i <= range; ++i) {
-    CType value = static_cast<CType>(i + min);
-    int64_t count = value_counts_vector[i];
-    if (count) {
-      value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+  using CType = typename InType::c_type;
+
+  using ValueCountPair = std::pair<CType, int64_t>;
+  auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+    const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
+    return lhs.second > rhs.second ||
+           (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+  };
+
+  std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+      std::move(gt));
+
+  const ModeOptions& options = ModeState::Get(ctx);
+  while (true) {
+    const ValueCountPair& value_count = gen();
+    DCHECK_NE(value_count.second, 0);
+    if (value_count.second < 0) break;  // EOF reached
+    if (static_cast<int64_t>(min_heap.size()) < options.n) {
+      min_heap.push(value_count);
+    } else if (gt(value_count, min_heap.top())) {
+      min_heap.pop();
+      min_heap.push(value_count);
     }
   }
+  const int64_t n = min_heap.size();
 
-  return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
-  // see https://issues.apache.org/jira/browse/ARROW-9873
-  static constexpr int kMinArraySize = 8192 / sizeof(CType);
-  static constexpr int kMaxValueRange = 16384;
-  const ArrayData& data = *array.data();
-  const CType* values = data.GetValues<CType>(1);
-
-  if ((array.length() - array.null_count()) >= kMinArraySize) {
-    CType min = std::numeric_limits<CType>::max();
-    CType max = std::numeric_limits<CType>::min();
-
-    arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
-                                         [&](int64_t pos, int64_t len) {
-                                           for (int64_t i = 0; i < len; ++i) {
-                                             const auto value = values[pos + i];
-                                             min = std::min(min, value);
-                                             max = std::max(max, value);
-                                           }
-                                         });
-
-    if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
-      return CountValuesByVector(array, min, max);
-    }
-  }
-  return CountValuesByMap(array);
-}
+  CType* mode_buffer;
+  int64_t* count_buffer;
+  KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                         PrepareOutput<InType>(n, ctx, out));
 
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  // we need just count ones and zeros
-  CounterMap<CType> map;
-  if (array.length() > array.null_count()) {
-    map[true] = array.true_count();
-    map[false] = array.length() - array.null_count() - map[true];
+  for (int64_t i = n - 1; i >= 0; --i) {
+    std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+    min_heap.pop();
   }
-  nan_count = 0;
-  return map;
 }
 
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  using Limits = std::numeric_limits<CType>;
-  nan_count = 0;
-  return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+  using CType = typename T::c_type;
 
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
-            CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMapOrVector(array);
-}
+  CType min;
+  std::vector<int64_t> counts;
 
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>>  // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
-  nan_count = 0;
-  return CountValuesByMap(array, nan_count);
-}
+  CountModer(CType min, CType max) {
+    uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+    DCHECK_LT(value_range, 1 << 20);
+    this->min = min;
+    this->counts.resize(value_range, 0);
+  }
 
-template <typename ArrowType>
-struct ModeState {
-  using ThisType = ModeState<ArrowType>;
-  using CType = typename ArrowType::c_type;
-
-  void MergeFrom(ThisType&& state) {
-    if (this->value_counts.empty()) {
-      this->value_counts = std::move(state.value_counts);
-    } else {
-      for (const auto& value_count : state.value_counts) {
-        auto value = value_count.first;
-        auto count = value_count.second;
-        this->value_counts[value] += count;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // count values in all chunks, ignore nulls
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                ++this->counts[values[pos + i] - this->min];
+                              }
+                            });
       }
     }
-    if (is_floating_type<ArrowType>::value) {
-      this->nan_count += state.nan_count;
-    }
-  }
-
-  // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
-  void Finalize(CType* modes, int64_t* counts, const int64_t n) {
-    DCHECK(n >= 1 && n <= this->DistinctValues());
 
-    // mode 'greater than' comparator: larger count or same count with smaller value
-    using ValueCountPair = std::pair<CType, int64_t>;
-    auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
-      const bool rhs_is_nan = rhs.first != rhs.first;  // nan as largest value
-      return lhs.second > rhs.second ||
-             (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+    // generator to emit next value:count pair
+    int index = 0;
+    auto gen = [&]() {
+      for (; index < static_cast<int>(counts.size()); ++index) {
+        if (counts[index] != 0) {
+          auto value_count =
+              std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+          ++index;
+          return value_count;
+        }
+      }
+      return std::make_pair<CType, int64_t>(0, -1);  // EOF
     };
 
-    // initialize min-heap with first n modes
-    std::vector<ValueCountPair> vector(n);
-    // push nan if exists
-    const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
-    if (has_nan) {
-      vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
-    }
-    // push n or n-1 modes
-    auto it = this->value_counts.cbegin();
-    for (int i = has_nan; i < n; ++i) {
-      vector[i] = *it++;
-    }
-    // turn to min-heap
-    std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
-        min_heap(std::move(mode_gt), std::move(vector));
-
-    // iterate and insert modes into min-heap
-    // - mode < heap top: ignore mode
-    // - mode > heap top: discard heap top, insert mode
-    for (; it != this->value_counts.cend(); ++it) {
-      if (mode_gt(*it, min_heap.top())) {
-        min_heap.pop();
-        min_heap.push(*it);
-      }
+    Finalize<T>(ctx, out, std::move(gen));
+  }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    int64_t counts[2]{};
+
+    const Datum& datum = batch[0];
+    for (const auto& array : datum.chunks()) {
+      if (array->length() > array->null_count()) {
+        const int64_t true_count =
+            checked_pointer_cast<BooleanArray>(array)->true_count();
+        const int64_t false_count = array->length() - array->null_count() - true_count;
+        counts[true] += true_count;
+        counts[false] += false_count;
+      };
     }
 
-    // pop modes from min-heap and insert into output array (in reverse order)
-    DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
-    for (int64_t i = n - 1; i >= 0; --i) {
-      std::tie(modes[i], counts[i]) = min_heap.top();
-      min_heap.pop();
+    const ModeOptions& options = ModeState::Get(ctx);
+    const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+    const int64_t n = std::min(options.n, distinct_values);
+
+    bool* mode_buffer;
+    int64_t* count_buffer;
+    KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+                           PrepareOutput<BooleanType>(n, ctx, out));
+
+    if (n >= 1) {
+      const bool index = counts[1] > counts[0];
+      mode_buffer[0] = index;
+      count_buffer[0] = counts[index];
+      if (n == 2) {
+        mode_buffer[1] = !index;
+        count_buffer[1] = counts[!index];
+      }
     }
   }
+};
 
-  int64_t DistinctValues() const {
-    return this->value_counts.size() +
-           (is_floating_type<ArrowType>::value && this->nan_count > 0);
-  }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+  using CType = typename T::c_type;
+  using Allocator = arrow::stl::allocator<CType>;
 
-  int64_t nan_count = 0;  // only make sense to floating types
-  CounterMap<CType> value_counts;
-};
+  int64_t nan_count = 0;
 
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
-  using ThisType = ModeImpl<ArrowType>;
-  using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
-  using CType = typename ArrowType::c_type;
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // copy all chunks to a buffer, ignore nulls and nans
+    std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
 
-  ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : out_type(out_type), options(options) {}
+    const Datum& datum = batch[0];
+    const int64_t in_length = datum.length() - datum.null_count();
+    if (in_length > 0) {
+      in_buffer.resize(in_length);
 
-  void Consume(KernelContext*, const ExecBatch& batch) override {
-    ArrayType array(batch[0].array());
-    this->state.value_counts = CountValues(array, this->state.nan_count);
-  }
+      int64_t index = 0;
+      for (const auto& array : datum.chunks()) {
+        index += CopyArray(in_buffer.data() + index, *array);
+      }
 
-  void MergeFrom(KernelContext*, KernelState&& src) override {
-    auto& other = checked_cast<ThisType&>(src);
-    this->state.MergeFrom(std::move(other.state));
-  }
+      // drop nan
+      if (is_floating_type<T>::value) {
+        const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+                                        [](CType v) { return v != v; });
+        this->nan_count = in_buffer.end() - it;
+        in_buffer.resize(it - in_buffer.begin());
+      }
+    }
 
-  static std::shared_ptr<ArrayData> MakeArrayData(
-      const std::shared_ptr<DataType>& data_type, int64_t n) {
-    auto data = ArrayData::Make(data_type, n, 0);
-    data->buffers.resize(2);
-    data->buffers[0] = nullptr;
-    data->buffers[1] = nullptr;
-    return data;
+    // sort the input data to count same values
+    std::sort(in_buffer.begin(), in_buffer.end());
+
+    // generator to emit next value:count pair
+    auto it = in_buffer.cbegin();
+    int64_t nan_count_copy = this->nan_count;
+    auto gen = [&]() {
+      if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+        // handle NAN at last
+        if (nan_count_copy > 0) {
+          auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+          nan_count_copy = 0;
+          return value_count;
+        }
+        return std::make_pair<CType, int64_t>(0, -1);  // EOF
+      }
+      // count same values
+      const CType value = *it;
+      int64_t count = 0;
+      do {
+        ++it;
+        ++count;
+      } while (it != in_buffer.cend() && *it == value);
+      return std::make_pair(value, count);
+    };
+
+    Finalize<T>(ctx, out, std::move(gen));
   }
 
-  void Finalize(KernelContext* ctx, Datum* out) override {
-    const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
-    const auto& count_type = int64();
-    const auto& out_type =
-        struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
-    int64_t n = this->options.n;
-    if (n > state.DistinctValues()) {
-      n = state.DistinctValues();
-    } else if (n < 0) {
-      n = 0;
+  static int64_t CopyArray(CType* buffer, const Array& array) {
+    const int64_t n = array.length() - array.null_count();
+    if (n > 0) {
+      int64_t index = 0;
+      const ArrayData& data = *array.data();
+      const CType* values = data.GetValues<CType>(1);
+      VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                          [&](int64_t pos, int64_t len) {
+                            memcpy(buffer + index, values + pos, len * sizeof(CType));
+                            index += len;
+                          });
+      DCHECK_EQ(index, n);
     }
+    return n;
+  }
+};
 
-    auto mode_data = this->MakeArrayData(mode_type, n);
-    auto count_data = this->MakeArrayData(count_type, n);
-    if (n > 0) {
-      KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(CType)));
-      KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
-                             ctx->Allocate(n * sizeof(int64_t)));
-      CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
-      int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
-      this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+  using CType = typename T::c_type;
+
+  void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    // cross point to benefit from counting approach
+    // about 2x improvement for int32/64 from micro-benchmarking
+    static constexpr int kMinArraySize = 8192;
+    static constexpr int kMaxValueRange = 32768;
+
+    const Datum& datum = batch[0];
+    if (datum.length() - datum.null_count() >= kMinArraySize) {
+      CType min = std::numeric_limits<CType>::max();
+      CType max = std::numeric_limits<CType>::min();
+
+      for (const auto& array : datum.chunks()) {
+        const ArrayData& data = *array->data();
+        const CType* values = data.GetValues<CType>(1);
+        VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+                            [&](int64_t pos, int64_t len) {
+                              for (int64_t i = 0; i < len; ++i) {
+                                min = std::min(min, values[pos + i]);
+                                max = std::max(max, values[pos + i]);
+                              }
+                            });
+      }
+
+      if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
+        CountModer<T>(min, max).Exec(ctx, batch, out);
+        return;
+      }
     }
 
-    *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
+    SortModer<T>().Exec(ctx, batch, out);
   }
+};
 
-  std::shared_ptr<DataType> out_type;
-  ModeState<ArrowType> state;
-  ModeOptions options;
+template <typename InType, typename Enable = void>
+struct Moder;
+
+template <>
+struct Moder<Int8Type> {
+  CountModer<Int8Type> impl;
+  Moder() : impl(-128, 127) {}
 };
 
-struct ModeInitState {
-  std::unique_ptr<KernelState> state;
-  KernelContext* ctx;
-  const DataType& in_type;
-  const std::shared_ptr<DataType>& out_type;
-  const ModeOptions& options;
+template <>
+struct Moder<UInt8Type> {
+  CountModer<UInt8Type> impl;
+  Moder() : impl(0, 255) {}
+};
 
-  ModeInitState(KernelContext* ctx, const DataType& in_type,
-                const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
-      : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}
+template <>
+struct Moder<BooleanType> {
+  CountModer<BooleanType> impl;
+};
 
-  Status Visit(const DataType&) { return Status::NotImplemented("No mode implemented"); }
+template <typename InType>
+struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
+                                  (sizeof(typename InType::c_type) > 1))>> {
+  CountOrSortModer<InType> impl;
+};
 
-  Status Visit(const HalfFloatType&) {
-    return Status::NotImplemented("No mode implemented");
-  }
+template <typename InType>
+struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
+  SortModer<InType> impl;
+};
 
-  template <typename Type>
-  enable_if_t<is_number_type<Type>::value || is_boolean_type<Type>::value, Status> Visit(
-      const Type&) {
-    state.reset(new ModeImpl<Type>(out_type, options));
-    return Status::OK();
-  }
+template <typename _, typename InType>
+struct ModeExecutor {
+  static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+    if (ctx->state() == nullptr) {
+      ctx->SetStatus(Status::Invalid("Mode requires ModeOptions"));
+      return;
+    }
+    const ModeOptions& options = ModeState::Get(ctx);
+    if (options.n <= 0) {
+      ctx->SetStatus(Status::Invalid("ModeOption::n must be positive"));

Review comment:
       "strictly positive" or "> 0"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-819167274


   > Do the existing tests exercise both the narrow and wide cases?
   
   Yes. https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/aggregate_test.cc#L1075-L1082
   I do find a comment should be updated `hashmap-based` -> `sorter-based`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10009:
URL: https://github.com/apache/arrow/pull/10009


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] cyb70289 commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel

Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818452704


   Benchmark on skylake, clang-9.
   
   ```
   -----------------------------------------------------------------------------------------------
   Non-regressions: (40)
   -----------------------------------------------------------------------------------------------
                                    benchmark         baseline        contender  change % counters
          ModeKernelWide<Int32Type>/1048576/0   20.644 MiB/sec   66.187 MiB/sec   220.613       {}
        ModeKernelWide<Int32Type>/1048576/100   20.768 MiB/sec   65.772 MiB/sec   216.699       {}
      ModeKernelWide<Int32Type>/1048576/10000   20.626 MiB/sec   65.291 MiB/sec   216.546       {}
         ModeKernelWide<Int32Type>/1048576/10   22.390 MiB/sec   69.904 MiB/sec   212.209       {}
      ModeKernelWide<FloatType>/1048576/10000   18.174 MiB/sec   54.517 MiB/sec   199.975       {}
          ModeKernelWide<FloatType>/1048576/0   18.423 MiB/sec   54.869 MiB/sec   197.826       {}
         ModeKernelWide<FloatType>/1048576/10   20.009 MiB/sec   59.346 MiB/sec   196.601       {}
        ModeKernelWide<FloatType>/1048576/100   18.486 MiB/sec   54.568 MiB/sec   195.186       {}
        ModeKernelWide<Int64Type>/1048576/100   46.593 MiB/sec  136.786 MiB/sec   193.578       {}
          ModeKernelWide<Int64Type>/1048576/0   46.434 MiB/sec  134.296 MiB/sec   189.221       {}
      ModeKernelWide<Int64Type>/1048576/10000   46.435 MiB/sec  134.149 MiB/sec   188.894       {}
        ModeKernelNarrow<Int64Type>/1048576/0  893.469 MiB/sec    2.515 GiB/sec   188.197       {}
    ModeKernelNarrow<Int64Type>/1048576/10000  886.403 MiB/sec    2.484 GiB/sec   186.933       {}
         ModeKernelWide<Int64Type>/1048576/10   50.182 MiB/sec  143.034 MiB/sec   185.029       {}
     ModeKernelWide<DoubleType>/1048576/10000   40.525 MiB/sec  115.477 MiB/sec   184.953       {}
       ModeKernelWide<DoubleType>/1048576/100   41.114 MiB/sec  116.862 MiB/sec   184.241       {}
        ModeKernelWide<DoubleType>/1048576/10   44.555 MiB/sec  126.095 MiB/sec   183.008       {}
         ModeKernelWide<DoubleType>/1048576/0   41.083 MiB/sec  113.499 MiB/sec   176.263       {}
          ModeKernelWide<FloatType>/1048576/2   39.704 MiB/sec  106.142 MiB/sec   167.335       {}
      ModeKernelNarrow<Int64Type>/1048576/100  853.217 MiB/sec    2.223 GiB/sec   166.836       {}
          ModeKernelWide<Int32Type>/1048576/2   43.329 MiB/sec  115.209 MiB/sec   165.893       {}
         ModeKernelWide<DoubleType>/1048576/2   88.485 MiB/sec  223.284 MiB/sec   152.340       {}
          ModeKernelWide<Int64Type>/1048576/2   97.572 MiB/sec  236.831 MiB/sec   142.723       {}
        ModeKernelNarrow<Int32Type>/1048576/0  715.706 MiB/sec    1.522 GiB/sec   117.703       {}
    ModeKernelNarrow<Int32Type>/1048576/10000  714.237 MiB/sec    1.497 GiB/sec   114.577       {}
       ModeKernelNarrow<Int64Type>/1048576/10  701.986 MiB/sec    1.399 GiB/sec   104.094       {}
      ModeKernelNarrow<Int32Type>/1048576/100  666.834 MiB/sec    1.291 GiB/sec    98.234       {}
        ModeKernelNarrow<Int64Type>/1048576/2  611.412 MiB/sec    1.057 GiB/sec    77.008       {}
       ModeKernelNarrow<Int32Type>/1048576/10  499.852 MiB/sec  780.344 MiB/sec    56.115       {}
        ModeKernelNarrow<Int32Type>/1048576/2  402.986 MiB/sec  562.758 MiB/sec    39.647       {}
         ModeKernelNarrow<Int8Type>/1048576/1  499.705 GiB/sec  639.311 GiB/sec    27.938       {}
         ModeKernelWide<DoubleType>/1048576/1  599.969 GiB/sec  738.218 GiB/sec    23.043       {}
        ModeKernelNarrow<Int64Type>/1048576/1  610.213 GiB/sec  743.338 GiB/sec    21.816       {}
          ModeKernelWide<Int64Type>/1048576/1  608.551 GiB/sec  735.741 GiB/sec    20.901       {}
          ModeKernelWide<Int32Type>/1048576/1  611.261 GiB/sec  727.163 GiB/sec    18.961       {}
        ModeKernelNarrow<Int32Type>/1048576/1  601.834 GiB/sec  714.017 GiB/sec    18.640       {}
          ModeKernelWide<FloatType>/1048576/1  612.628 GiB/sec  716.591 GiB/sec    16.970       {}
      ModeKernelNarrow<BooleanType>/1048576/1  628.851 GiB/sec  691.936 GiB/sec    10.032       {}
      ModeKernelNarrow<BooleanType>/1048576/0   21.709 GiB/sec   22.397 GiB/sec     3.166       {}
         ModeKernelNarrow<Int8Type>/1048576/2  293.420 MiB/sec  286.770 MiB/sec    -2.266       {}
   
   -------------------------------------------------------------------------------------------------
   Regressions: (8)
   -------------------------------------------------------------------------------------------------
                                      benchmark         baseline        contender  change % counters
    ModeKernelNarrow<BooleanType>/1048576/10000    1.959 GiB/sec    1.856 GiB/sec    -5.274       {}
        ModeKernelNarrow<BooleanType>/1048576/2    1.957 GiB/sec    1.854 GiB/sec    -5.293       {}
      ModeKernelNarrow<BooleanType>/1048576/100    1.972 GiB/sec    1.866 GiB/sec    -5.374       {}
       ModeKernelNarrow<BooleanType>/1048576/10    1.974 GiB/sec    1.855 GiB/sec    -6.027       {}
          ModeKernelNarrow<Int8Type>/1048576/10  364.472 MiB/sec  342.415 MiB/sec    -6.052       {}
           ModeKernelNarrow<Int8Type>/1048576/0  484.491 MiB/sec  440.347 MiB/sec    -9.111       {}
         ModeKernelNarrow<Int8Type>/1048576/100  458.333 MiB/sec  416.045 MiB/sec    -9.226       {}
       ModeKernelNarrow<Int8Type>/1048576/10000  486.679 MiB/sec  439.186 MiB/sec    -9.759       {}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org