You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/13 05:38:49 UTC
[GitHub] [arrow] cyb70289 opened a new pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
cyb70289 opened a new pull request #10009:
URL: https://github.com/apache/arrow/pull/10009
Arrow mode kernel performance is bad compared with scipy.stats.mode
(based on numpy.unique). Arrow mode kernel stores value:count pair in
a map, while numpy.unique sorts the input array then count the adjacent
same values. Per my test, the map approach only wins when there are
many duplicated values (length / value_range > 100), looks not very
useful in practice.
This patch rewrites mode kernel to use the sort and count approach for
floating points and integers with wide value range. 2x performance
improvement is observed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612891573
##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
namespace {
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
constexpr char kModeFieldName[] = "mode";
constexpr char kCountFieldName[] = "count";
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array, int64_t& nan_count) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- nan_count = 0;
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- if (std::isnan(value)) {
- ++nan_count;
- } else {
- ++value_counts_map[value];
- }
- }
- });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+ Datum* out) {
+ const auto& mode_type = TypeTraits<InType>::type_singleton();
+ const auto& count_type = int64();
+
+ auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+ mode_data->buffers.resize(2, nullptr);
+ auto count_data = ArrayData::Make(count_type, n, 0);
+ count_data->buffers.resize(2, nullptr);
+
+ CType* mode_buffer = nullptr;
+ int64_t* count_buffer = nullptr;
+
+ if (n > 0) {
+ ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+ ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+ mode_buffer = mode_data->template GetMutableValues<CType>(1);
+ count_buffer = count_data->template GetMutableValues<int64_t>(1);
}
- return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_map[values[pos + i]];
- }
- });
- }
+ const auto& out_type =
+ struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+ *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
- return value_counts_map;
+ return std::make_pair(mode_buffer, count_buffer);
}
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
- const int range = static_cast<int>(max - min);
- DCHECK(range >= 0 && range < 64 * 1024 * 1024);
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- std::vector<int64_t> value_counts_vector(range + 1);
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_vector[values[pos + i] - min];
- }
- });
- }
-
- // Transfer value counts to a map to be consistent with other chunks
- CounterMap<CType> value_counts_map(range + 1);
- for (int i = 0; i <= range; ++i) {
- CType value = static_cast<CType>(i + min);
- int64_t count = value_counts_vector[i];
- if (count) {
- value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+ using CType = typename InType::c_type;
+
+ using ValueCountPair = std::pair<CType, int64_t>;
+ auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+ const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
+ return lhs.second > rhs.second ||
+ (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ };
+
+ std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+ std::move(gt));
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ while (true) {
+ const ValueCountPair& value_count = gen();
+ DCHECK_NE(value_count.second, 0);
+ if (value_count.second < 0) break; // EOF reached
+ if (static_cast<int64_t>(min_heap.size()) < options.n) {
+ min_heap.push(value_count);
+ } else if (gt(value_count, min_heap.top())) {
+ min_heap.pop();
+ min_heap.push(value_count);
}
}
+ const int64_t n = min_heap.size();
- return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
- // see https://issues.apache.org/jira/browse/ARROW-9873
- static constexpr int kMinArraySize = 8192 / sizeof(CType);
- static constexpr int kMaxValueRange = 16384;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if ((array.length() - array.null_count()) >= kMinArraySize) {
- CType min = std::numeric_limits<CType>::max();
- CType max = std::numeric_limits<CType>::min();
-
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- min = std::min(min, value);
- max = std::max(max, value);
- }
- });
-
- if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
- return CountValuesByVector(array, min, max);
- }
- }
- return CountValuesByMap(array);
-}
+ CType* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<InType>(n, ctx, out));
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- // we need just count ones and zeros
- CounterMap<CType> map;
- if (array.length() > array.null_count()) {
- map[true] = array.true_count();
- map[false] = array.length() - array.null_count() - map[true];
+ for (int64_t i = n - 1; i >= 0; --i) {
+ std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+ min_heap.pop();
}
- nan_count = 0;
- return map;
}
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- using Limits = std::numeric_limits<CType>;
- nan_count = 0;
- return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+ using CType = typename T::c_type;
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMapOrVector(array);
-}
+ CType min;
+ std::vector<int64_t> counts;
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>> // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMap(array, nan_count);
-}
+ CountModer(CType min, CType max) {
+ uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+ DCHECK_LT(value_range, 1 << 20);
+ this->min = min;
+ this->counts.resize(value_range, 0);
+ }
-template <typename ArrowType>
-struct ModeState {
- using ThisType = ModeState<ArrowType>;
- using CType = typename ArrowType::c_type;
-
- void MergeFrom(ThisType&& state) {
- if (this->value_counts.empty()) {
- this->value_counts = std::move(state.value_counts);
- } else {
- for (const auto& value_count : state.value_counts) {
- auto value = value_count.first;
- auto count = value_count.second;
- this->value_counts[value] += count;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // count values in all chunks, ignore nulls
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ ++this->counts[values[pos + i] - this->min];
+ }
+ });
}
}
- if (is_floating_type<ArrowType>::value) {
- this->nan_count += state.nan_count;
- }
- }
-
- // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
- void Finalize(CType* modes, int64_t* counts, const int64_t n) {
- DCHECK(n >= 1 && n <= this->DistinctValues());
- // mode 'greater than' comparator: larger count or same count with smaller value
- using ValueCountPair = std::pair<CType, int64_t>;
- auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
- const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
- return lhs.second > rhs.second ||
- (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ // generator to emit next value:count pair
+ int index = 0;
+ auto gen = [&]() {
+ for (; index < static_cast<int>(counts.size()); ++index) {
+ if (counts[index] != 0) {
+ auto value_count =
+ std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+ ++index;
+ return value_count;
+ }
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
};
- // initialize min-heap with first n modes
- std::vector<ValueCountPair> vector(n);
- // push nan if exists
- const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
- if (has_nan) {
- vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
- }
- // push n or n-1 modes
- auto it = this->value_counts.cbegin();
- for (int i = has_nan; i < n; ++i) {
- vector[i] = *it++;
- }
- // turn to min-heap
- std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
- min_heap(std::move(mode_gt), std::move(vector));
-
- // iterate and insert modes into min-heap
- // - mode < heap top: ignore mode
- // - mode > heap top: discard heap top, insert mode
- for (; it != this->value_counts.cend(); ++it) {
- if (mode_gt(*it, min_heap.top())) {
- min_heap.pop();
- min_heap.push(*it);
- }
+ Finalize<T>(ctx, out, std::move(gen));
+ }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ int64_t counts[2]{};
+
+ const Datum& datum = batch[0];
+ for (const auto& array : datum.chunks()) {
+ if (array->length() > array->null_count()) {
+ const int64_t true_count =
+ checked_pointer_cast<BooleanArray>(array)->true_count();
+ const int64_t false_count = array->length() - array->null_count() - true_count;
+ counts[true] += true_count;
+ counts[false] += false_count;
+ };
}
- // pop modes from min-heap and insert into output array (in reverse order)
- DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
- for (int64_t i = n - 1; i >= 0; --i) {
- std::tie(modes[i], counts[i]) = min_heap.top();
- min_heap.pop();
+ const ModeOptions& options = ModeState::Get(ctx);
+ const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+ const int64_t n = std::min(options.n, distinct_values);
+
+ bool* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<BooleanType>(n, ctx, out));
+
+ if (n >= 1) {
+ const bool index = counts[1] > counts[0];
+ mode_buffer[0] = index;
+ count_buffer[0] = counts[index];
+ if (n == 2) {
+ mode_buffer[1] = !index;
+ count_buffer[1] = counts[!index];
+ }
}
}
+};
- int64_t DistinctValues() const {
- return this->value_counts.size() +
- (is_floating_type<ArrowType>::value && this->nan_count > 0);
- }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+ using CType = typename T::c_type;
+ using Allocator = arrow::stl::allocator<CType>;
- int64_t nan_count = 0; // only make sense to floating types
- CounterMap<CType> value_counts;
-};
+ int64_t nan_count = 0;
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
- using ThisType = ModeImpl<ArrowType>;
- using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
- using CType = typename ArrowType::c_type;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // copy all chunks to a buffer, ignore nulls and nans
+ std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
- ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : out_type(out_type), options(options) {}
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ in_buffer.resize(in_length);
- void Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.value_counts = CountValues(array, this->state.nan_count);
- }
+ int64_t index = 0;
+ for (const auto& array : datum.chunks()) {
+ index += CopyArray(in_buffer.data() + index, *array);
+ }
- void MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast<ThisType&>(src);
- this->state.MergeFrom(std::move(other.state));
- }
+ // drop nan
+ if (is_floating_type<T>::value) {
+ const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+ [](CType v) { return v != v; });
+ this->nan_count = in_buffer.end() - it;
+ in_buffer.resize(it - in_buffer.begin());
+ }
+ }
- static std::shared_ptr<ArrayData> MakeArrayData(
- const std::shared_ptr<DataType>& data_type, int64_t n) {
- auto data = ArrayData::Make(data_type, n, 0);
- data->buffers.resize(2);
- data->buffers[0] = nullptr;
- data->buffers[1] = nullptr;
- return data;
+ // sort the input data to count same values
+ std::sort(in_buffer.begin(), in_buffer.end());
+
+ // generator to emit next value:count pair
+ auto it = in_buffer.cbegin();
+ int64_t nan_count_copy = this->nan_count;
+ auto gen = [&]() {
+ if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+ // handle NAN at last
+ if (nan_count_copy > 0) {
+ auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+ nan_count_copy = 0;
+ return value_count;
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
+ }
+ // count same values
+ const CType value = *it;
+ int64_t count = 0;
+ do {
+ ++it;
+ ++count;
+ } while (it != in_buffer.cend() && *it == value);
+ return std::make_pair(value, count);
+ };
+
+ Finalize<T>(ctx, out, std::move(gen));
}
- void Finalize(KernelContext* ctx, Datum* out) override {
- const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
- const auto& count_type = int64();
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
- int64_t n = this->options.n;
- if (n > state.DistinctValues()) {
- n = state.DistinctValues();
- } else if (n < 0) {
- n = 0;
+ static int64_t CopyArray(CType* buffer, const Array& array) {
+ const int64_t n = array.length() - array.null_count();
+ if (n > 0) {
+ int64_t index = 0;
+ const ArrayData& data = *array.data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ memcpy(buffer + index, values + pos, len * sizeof(CType));
+ index += len;
+ });
+ DCHECK_EQ(index, n);
}
+ return n;
+ }
+};
- auto mode_data = this->MakeArrayData(mode_type, n);
- auto count_data = this->MakeArrayData(count_type, n);
- if (n > 0) {
- KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(CType)));
- KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(int64_t)));
- CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
- int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
- this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+ using CType = typename T::c_type;
+
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // cross point to benefit from counting approach
+ // about 2x improvement for int32/64 from micro-benchmarking
+ static constexpr int kMinArraySize = 8192;
+ static constexpr int kMaxValueRange = 32768;
+
+ const Datum& datum = batch[0];
+ if (datum.length() - datum.null_count() >= kMinArraySize) {
+ CType min = std::numeric_limits<CType>::max();
+ CType max = std::numeric_limits<CType>::min();
+
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ min = std::min(min, values[pos + i]);
+ max = std::max(max, values[pos + i]);
+ }
+ });
+ }
+
+ if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
+ CountModer<T>(min, max).Exec(ctx, batch, out);
+ return;
+ }
}
- *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
+ SortModer<T>().Exec(ctx, batch, out);
}
+};
- std::shared_ptr<DataType> out_type;
- ModeState<ArrowType> state;
- ModeOptions options;
+template <typename InType, typename Enable = void>
+struct Moder;
+
+template <>
+struct Moder<Int8Type> {
+ CountModer<Int8Type> impl;
+ Moder() : impl(-128, 127) {}
};
-struct ModeInitState {
- std::unique_ptr<KernelState> state;
- KernelContext* ctx;
- const DataType& in_type;
- const std::shared_ptr<DataType>& out_type;
- const ModeOptions& options;
+template <>
+struct Moder<UInt8Type> {
+ CountModer<UInt8Type> impl;
+ Moder() : impl(0, 255) {}
+};
- ModeInitState(KernelContext* ctx, const DataType& in_type,
- const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}
+template <>
+struct Moder<BooleanType> {
+ CountModer<BooleanType> impl;
+};
- Status Visit(const DataType&) { return Status::NotImplemented("No mode implemented"); }
+template <typename InType>
+struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
+ (sizeof(typename InType::c_type) > 1))>> {
+ CountOrSortModer<InType> impl;
+};
- Status Visit(const HalfFloatType&) {
- return Status::NotImplemented("No mode implemented");
- }
+template <typename InType>
+struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
+ SortModer<InType> impl;
+};
- template <typename Type>
- enable_if_t<is_number_type<Type>::value || is_boolean_type<Type>::value, Status> Visit(
- const Type&) {
- state.reset(new ModeImpl<Type>(out_type, options));
- return Status::OK();
- }
+template <typename _, typename InType>
+struct ModeExecutor {
+ static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (ctx->state() == nullptr) {
+ ctx->SetStatus(Status::Invalid("Mode requires ModeOptions"));
+ return;
+ }
+ const ModeOptions& options = ModeState::Get(ctx);
+ if (options.n <= 0) {
+ ctx->SetStatus(Status::Invalid("ModeOption::n must be positive"));
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613724252
##########
File path: cpp/src/arrow/util/bit_run_reader.h
##########
@@ -480,8 +480,8 @@ Status VisitSetBitRuns(const uint8_t* bitmap, int64_t offset, int64_t length,
}
template <typename Visit>
-void VisitSetBitRunsVoid(const uint8_t* bitmap, int64_t offset, int64_t length,
- Visit&& visit) {
+inline void VisitSetBitRunsVoid(const uint8_t* bitmap, int64_t offset, int64_t length,
Review comment:
Add `inline` hint.
If the caller is from a cpp source, compiler is willing to inline. But if the caller is from a header file, compiler prefers non-inline, though in reality it doesn't increase binary size compared with called from source. Non-inline causes big perf drop as the visitor becomes a function call and cannot be optimized together with the loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-820516174
I checked Github Actions and Travis-CI on my fork, will merge.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613727094
##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
namespace {
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
constexpr char kModeFieldName[] = "mode";
constexpr char kCountFieldName[] = "count";
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array, int64_t& nan_count) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- nan_count = 0;
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- if (std::isnan(value)) {
- ++nan_count;
- } else {
- ++value_counts_map[value];
- }
- }
- });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+ Datum* out) {
+ const auto& mode_type = TypeTraits<InType>::type_singleton();
+ const auto& count_type = int64();
+
+ auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+ mode_data->buffers.resize(2, nullptr);
+ auto count_data = ArrayData::Make(count_type, n, 0);
+ count_data->buffers.resize(2, nullptr);
+
+ CType* mode_buffer = nullptr;
+ int64_t* count_buffer = nullptr;
+
+ if (n > 0) {
+ ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+ ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+ mode_buffer = mode_data->template GetMutableValues<CType>(1);
+ count_buffer = count_data->template GetMutableValues<int64_t>(1);
}
- return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_map[values[pos + i]];
- }
- });
- }
+ const auto& out_type =
+ struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+ *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
- return value_counts_map;
+ return std::make_pair(mode_buffer, count_buffer);
}
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
- const int range = static_cast<int>(max - min);
- DCHECK(range >= 0 && range < 64 * 1024 * 1024);
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- std::vector<int64_t> value_counts_vector(range + 1);
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_vector[values[pos + i] - min];
- }
- });
- }
-
- // Transfer value counts to a map to be consistent with other chunks
- CounterMap<CType> value_counts_map(range + 1);
- for (int i = 0; i <= range; ++i) {
- CType value = static_cast<CType>(i + min);
- int64_t count = value_counts_vector[i];
- if (count) {
- value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+ using CType = typename InType::c_type;
+
+ using ValueCountPair = std::pair<CType, int64_t>;
+ auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+ const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
+ return lhs.second > rhs.second ||
+ (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ };
+
+ std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+ std::move(gt));
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ while (true) {
+ const ValueCountPair& value_count = gen();
+ DCHECK_NE(value_count.second, 0);
+ if (value_count.second < 0) break; // EOF reached
+ if (static_cast<int64_t>(min_heap.size()) < options.n) {
+ min_heap.push(value_count);
+ } else if (gt(value_count, min_heap.top())) {
+ min_heap.pop();
+ min_heap.push(value_count);
}
}
+ const int64_t n = min_heap.size();
- return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
- // see https://issues.apache.org/jira/browse/ARROW-9873
- static constexpr int kMinArraySize = 8192 / sizeof(CType);
- static constexpr int kMaxValueRange = 16384;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if ((array.length() - array.null_count()) >= kMinArraySize) {
- CType min = std::numeric_limits<CType>::max();
- CType max = std::numeric_limits<CType>::min();
-
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- min = std::min(min, value);
- max = std::max(max, value);
- }
- });
-
- if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
- return CountValuesByVector(array, min, max);
- }
- }
- return CountValuesByMap(array);
-}
+ CType* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<InType>(n, ctx, out));
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- // we need just count ones and zeros
- CounterMap<CType> map;
- if (array.length() > array.null_count()) {
- map[true] = array.true_count();
- map[false] = array.length() - array.null_count() - map[true];
+ for (int64_t i = n - 1; i >= 0; --i) {
+ std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+ min_heap.pop();
}
- nan_count = 0;
- return map;
}
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- using Limits = std::numeric_limits<CType>;
- nan_count = 0;
- return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+ using CType = typename T::c_type;
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMapOrVector(array);
-}
+ CType min;
+ std::vector<int64_t> counts;
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>> // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMap(array, nan_count);
-}
+ CountModer(CType min, CType max) {
+ uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+ DCHECK_LT(value_range, 1 << 20);
+ this->min = min;
+ this->counts.resize(value_range, 0);
+ }
-template <typename ArrowType>
-struct ModeState {
- using ThisType = ModeState<ArrowType>;
- using CType = typename ArrowType::c_type;
-
- void MergeFrom(ThisType&& state) {
- if (this->value_counts.empty()) {
- this->value_counts = std::move(state.value_counts);
- } else {
- for (const auto& value_count : state.value_counts) {
- auto value = value_count.first;
- auto count = value_count.second;
- this->value_counts[value] += count;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // count values in all chunks, ignore nulls
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ ++this->counts[values[pos + i] - this->min];
+ }
+ });
}
}
- if (is_floating_type<ArrowType>::value) {
- this->nan_count += state.nan_count;
- }
- }
-
- // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
- void Finalize(CType* modes, int64_t* counts, const int64_t n) {
- DCHECK(n >= 1 && n <= this->DistinctValues());
- // mode 'greater than' comparator: larger count or same count with smaller value
- using ValueCountPair = std::pair<CType, int64_t>;
- auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
- const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
- return lhs.second > rhs.second ||
- (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ // generator to emit next value:count pair
+ int index = 0;
+ auto gen = [&]() {
+ for (; index < static_cast<int>(counts.size()); ++index) {
+ if (counts[index] != 0) {
+ auto value_count =
+ std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+ ++index;
+ return value_count;
+ }
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
};
- // initialize min-heap with first n modes
- std::vector<ValueCountPair> vector(n);
- // push nan if exists
- const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
- if (has_nan) {
- vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
- }
- // push n or n-1 modes
- auto it = this->value_counts.cbegin();
- for (int i = has_nan; i < n; ++i) {
- vector[i] = *it++;
- }
- // turn to min-heap
- std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
- min_heap(std::move(mode_gt), std::move(vector));
-
- // iterate and insert modes into min-heap
- // - mode < heap top: ignore mode
- // - mode > heap top: discard heap top, insert mode
- for (; it != this->value_counts.cend(); ++it) {
- if (mode_gt(*it, min_heap.top())) {
- min_heap.pop();
- min_heap.push(*it);
- }
+ Finalize<T>(ctx, out, std::move(gen));
+ }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ int64_t counts[2]{};
+
+ const Datum& datum = batch[0];
+ for (const auto& array : datum.chunks()) {
+ if (array->length() > array->null_count()) {
+ const int64_t true_count =
+ checked_pointer_cast<BooleanArray>(array)->true_count();
+ const int64_t false_count = array->length() - array->null_count() - true_count;
+ counts[true] += true_count;
+ counts[false] += false_count;
+ };
}
- // pop modes from min-heap and insert into output array (in reverse order)
- DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
- for (int64_t i = n - 1; i >= 0; --i) {
- std::tie(modes[i], counts[i]) = min_heap.top();
- min_heap.pop();
+ const ModeOptions& options = ModeState::Get(ctx);
+ const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+ const int64_t n = std::min(options.n, distinct_values);
+
+ bool* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<BooleanType>(n, ctx, out));
+
+ if (n >= 1) {
+ const bool index = counts[1] > counts[0];
+ mode_buffer[0] = index;
+ count_buffer[0] = counts[index];
+ if (n == 2) {
+ mode_buffer[1] = !index;
+ count_buffer[1] = counts[!index];
+ }
}
}
+};
- int64_t DistinctValues() const {
- return this->value_counts.size() +
- (is_floating_type<ArrowType>::value && this->nan_count > 0);
- }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+ using CType = typename T::c_type;
+ using Allocator = arrow::stl::allocator<CType>;
- int64_t nan_count = 0; // only make sense to floating types
- CounterMap<CType> value_counts;
-};
+ int64_t nan_count = 0;
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
- using ThisType = ModeImpl<ArrowType>;
- using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
- using CType = typename ArrowType::c_type;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // copy all chunks to a buffer, ignore nulls and nans
+ std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
- ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : out_type(out_type), options(options) {}
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ in_buffer.resize(in_length);
- void Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.value_counts = CountValues(array, this->state.nan_count);
- }
+ int64_t index = 0;
+ for (const auto& array : datum.chunks()) {
+ index += CopyArray(in_buffer.data() + index, *array);
+ }
- void MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast<ThisType&>(src);
- this->state.MergeFrom(std::move(other.state));
- }
+ // drop nan
+ if (is_floating_type<T>::value) {
+ const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+ [](CType v) { return v != v; });
+ this->nan_count = in_buffer.end() - it;
+ in_buffer.resize(it - in_buffer.begin());
+ }
+ }
- static std::shared_ptr<ArrayData> MakeArrayData(
- const std::shared_ptr<DataType>& data_type, int64_t n) {
- auto data = ArrayData::Make(data_type, n, 0);
- data->buffers.resize(2);
- data->buffers[0] = nullptr;
- data->buffers[1] = nullptr;
- return data;
+ // sort the input data to count same values
+ std::sort(in_buffer.begin(), in_buffer.end());
+
+ // generator to emit next value:count pair
+ auto it = in_buffer.cbegin();
+ int64_t nan_count_copy = this->nan_count;
+ auto gen = [&]() {
+ if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+ // handle NAN at last
+ if (nan_count_copy > 0) {
+ auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+ nan_count_copy = 0;
+ return value_count;
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
+ }
+ // count same values
+ const CType value = *it;
+ int64_t count = 0;
+ do {
+ ++it;
+ ++count;
+ } while (it != in_buffer.cend() && *it == value);
+ return std::make_pair(value, count);
+ };
+
+ Finalize<T>(ctx, out, std::move(gen));
}
- void Finalize(KernelContext* ctx, Datum* out) override {
- const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
- const auto& count_type = int64();
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
- int64_t n = this->options.n;
- if (n > state.DistinctValues()) {
- n = state.DistinctValues();
- } else if (n < 0) {
- n = 0;
+ static int64_t CopyArray(CType* buffer, const Array& array) {
+ const int64_t n = array.length() - array.null_count();
+ if (n > 0) {
+ int64_t index = 0;
+ const ArrayData& data = *array.data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ memcpy(buffer + index, values + pos, len * sizeof(CType));
+ index += len;
+ });
+ DCHECK_EQ(index, n);
}
+ return n;
+ }
+};
- auto mode_data = this->MakeArrayData(mode_type, n);
- auto count_data = this->MakeArrayData(count_type, n);
- if (n > 0) {
- KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(CType)));
- KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(int64_t)));
- CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
- int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
- this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+ using CType = typename T::c_type;
+
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // cross point to benefit from counting approach
+ // about 2x improvement for int32/64 from micro-benchmarking
+ static constexpr int kMinArraySize = 8192;
+ static constexpr int kMaxValueRange = 32768;
+
+ const Datum& datum = batch[0];
+ if (datum.length() - datum.null_count() >= kMinArraySize) {
+ CType min = std::numeric_limits<CType>::max();
+ CType max = std::numeric_limits<CType>::min();
+
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ min = std::min(min, values[pos + i]);
+ max = std::max(max, values[pos + i]);
+ }
+ });
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818871509
Do the existing tests exercise both the narrow and wide cases?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612599321
##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
namespace {
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
constexpr char kModeFieldName[] = "mode";
constexpr char kCountFieldName[] = "count";
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array, int64_t& nan_count) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- nan_count = 0;
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- if (std::isnan(value)) {
- ++nan_count;
- } else {
- ++value_counts_map[value];
- }
- }
- });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+ Datum* out) {
+ const auto& mode_type = TypeTraits<InType>::type_singleton();
+ const auto& count_type = int64();
+
+ auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+ mode_data->buffers.resize(2, nullptr);
+ auto count_data = ArrayData::Make(count_type, n, 0);
+ count_data->buffers.resize(2, nullptr);
+
+ CType* mode_buffer = nullptr;
+ int64_t* count_buffer = nullptr;
+
+ if (n > 0) {
+ ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+ ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+ mode_buffer = mode_data->template GetMutableValues<CType>(1);
+ count_buffer = count_data->template GetMutableValues<int64_t>(1);
}
- return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_map[values[pos + i]];
- }
- });
- }
+ const auto& out_type =
+ struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+ *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
- return value_counts_map;
+ return std::make_pair(mode_buffer, count_buffer);
}
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
- const int range = static_cast<int>(max - min);
- DCHECK(range >= 0 && range < 64 * 1024 * 1024);
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- std::vector<int64_t> value_counts_vector(range + 1);
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_vector[values[pos + i] - min];
- }
- });
- }
-
- // Transfer value counts to a map to be consistent with other chunks
- CounterMap<CType> value_counts_map(range + 1);
- for (int i = 0; i <= range; ++i) {
- CType value = static_cast<CType>(i + min);
- int64_t count = value_counts_vector[i];
- if (count) {
- value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+ using CType = typename InType::c_type;
+
+ using ValueCountPair = std::pair<CType, int64_t>;
+ auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+ const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
+ return lhs.second > rhs.second ||
+ (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ };
+
+ std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+ std::move(gt));
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ while (true) {
+ const ValueCountPair& value_count = gen();
+ DCHECK_NE(value_count.second, 0);
+ if (value_count.second < 0) break; // EOF reached
+ if (static_cast<int64_t>(min_heap.size()) < options.n) {
+ min_heap.push(value_count);
+ } else if (gt(value_count, min_heap.top())) {
+ min_heap.pop();
+ min_heap.push(value_count);
}
}
+ const int64_t n = min_heap.size();
- return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
- // see https://issues.apache.org/jira/browse/ARROW-9873
- static constexpr int kMinArraySize = 8192 / sizeof(CType);
- static constexpr int kMaxValueRange = 16384;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if ((array.length() - array.null_count()) >= kMinArraySize) {
- CType min = std::numeric_limits<CType>::max();
- CType max = std::numeric_limits<CType>::min();
-
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- min = std::min(min, value);
- max = std::max(max, value);
- }
- });
-
- if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
- return CountValuesByVector(array, min, max);
- }
- }
- return CountValuesByMap(array);
-}
+ CType* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<InType>(n, ctx, out));
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- // we need just count ones and zeros
- CounterMap<CType> map;
- if (array.length() > array.null_count()) {
- map[true] = array.true_count();
- map[false] = array.length() - array.null_count() - map[true];
+ for (int64_t i = n - 1; i >= 0; --i) {
+ std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+ min_heap.pop();
}
- nan_count = 0;
- return map;
}
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- using Limits = std::numeric_limits<CType>;
- nan_count = 0;
- return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+ using CType = typename T::c_type;
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMapOrVector(array);
-}
+ CType min;
+ std::vector<int64_t> counts;
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>> // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMap(array, nan_count);
-}
+ CountModer(CType min, CType max) {
+ uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+ DCHECK_LT(value_range, 1 << 20);
+ this->min = min;
+ this->counts.resize(value_range, 0);
+ }
-template <typename ArrowType>
-struct ModeState {
- using ThisType = ModeState<ArrowType>;
- using CType = typename ArrowType::c_type;
-
- void MergeFrom(ThisType&& state) {
- if (this->value_counts.empty()) {
- this->value_counts = std::move(state.value_counts);
- } else {
- for (const auto& value_count : state.value_counts) {
- auto value = value_count.first;
- auto count = value_count.second;
- this->value_counts[value] += count;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // count values in all chunks, ignore nulls
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ ++this->counts[values[pos + i] - this->min];
+ }
+ });
}
}
- if (is_floating_type<ArrowType>::value) {
- this->nan_count += state.nan_count;
- }
- }
-
- // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
- void Finalize(CType* modes, int64_t* counts, const int64_t n) {
- DCHECK(n >= 1 && n <= this->DistinctValues());
- // mode 'greater than' comparator: larger count or same count with smaller value
- using ValueCountPair = std::pair<CType, int64_t>;
- auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
- const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
- return lhs.second > rhs.second ||
- (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ // generator to emit next value:count pair
+ int index = 0;
+ auto gen = [&]() {
+ for (; index < static_cast<int>(counts.size()); ++index) {
+ if (counts[index] != 0) {
+ auto value_count =
+ std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+ ++index;
+ return value_count;
+ }
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
};
- // initialize min-heap with first n modes
- std::vector<ValueCountPair> vector(n);
- // push nan if exists
- const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
- if (has_nan) {
- vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
- }
- // push n or n-1 modes
- auto it = this->value_counts.cbegin();
- for (int i = has_nan; i < n; ++i) {
- vector[i] = *it++;
- }
- // turn to min-heap
- std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
- min_heap(std::move(mode_gt), std::move(vector));
-
- // iterate and insert modes into min-heap
- // - mode < heap top: ignore mode
- // - mode > heap top: discard heap top, insert mode
- for (; it != this->value_counts.cend(); ++it) {
- if (mode_gt(*it, min_heap.top())) {
- min_heap.pop();
- min_heap.push(*it);
- }
+ Finalize<T>(ctx, out, std::move(gen));
+ }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ int64_t counts[2]{};
+
+ const Datum& datum = batch[0];
+ for (const auto& array : datum.chunks()) {
+ if (array->length() > array->null_count()) {
+ const int64_t true_count =
+ checked_pointer_cast<BooleanArray>(array)->true_count();
+ const int64_t false_count = array->length() - array->null_count() - true_count;
+ counts[true] += true_count;
+ counts[false] += false_count;
+ };
}
- // pop modes from min-heap and insert into output array (in reverse order)
- DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
- for (int64_t i = n - 1; i >= 0; --i) {
- std::tie(modes[i], counts[i]) = min_heap.top();
- min_heap.pop();
+ const ModeOptions& options = ModeState::Get(ctx);
+ const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+ const int64_t n = std::min(options.n, distinct_values);
+
+ bool* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<BooleanType>(n, ctx, out));
+
+ if (n >= 1) {
+ const bool index = counts[1] > counts[0];
+ mode_buffer[0] = index;
+ count_buffer[0] = counts[index];
+ if (n == 2) {
+ mode_buffer[1] = !index;
+ count_buffer[1] = counts[!index];
+ }
}
}
+};
- int64_t DistinctValues() const {
- return this->value_counts.size() +
- (is_floating_type<ArrowType>::value && this->nan_count > 0);
- }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+ using CType = typename T::c_type;
+ using Allocator = arrow::stl::allocator<CType>;
- int64_t nan_count = 0; // only make sense to floating types
- CounterMap<CType> value_counts;
-};
+ int64_t nan_count = 0;
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
- using ThisType = ModeImpl<ArrowType>;
- using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
- using CType = typename ArrowType::c_type;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // copy all chunks to a buffer, ignore nulls and nans
+ std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
- ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : out_type(out_type), options(options) {}
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ in_buffer.resize(in_length);
- void Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.value_counts = CountValues(array, this->state.nan_count);
- }
+ int64_t index = 0;
+ for (const auto& array : datum.chunks()) {
+ index += CopyArray(in_buffer.data() + index, *array);
+ }
- void MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast<ThisType&>(src);
- this->state.MergeFrom(std::move(other.state));
- }
+ // drop nan
+ if (is_floating_type<T>::value) {
+ const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+ [](CType v) { return v != v; });
+ this->nan_count = in_buffer.end() - it;
+ in_buffer.resize(it - in_buffer.begin());
+ }
+ }
- static std::shared_ptr<ArrayData> MakeArrayData(
- const std::shared_ptr<DataType>& data_type, int64_t n) {
- auto data = ArrayData::Make(data_type, n, 0);
- data->buffers.resize(2);
- data->buffers[0] = nullptr;
- data->buffers[1] = nullptr;
- return data;
+ // sort the input data to count same values
+ std::sort(in_buffer.begin(), in_buffer.end());
+
+ // generator to emit next value:count pair
+ auto it = in_buffer.cbegin();
+ int64_t nan_count_copy = this->nan_count;
+ auto gen = [&]() {
+ if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+ // handle NAN at last
+ if (nan_count_copy > 0) {
+ auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+ nan_count_copy = 0;
+ return value_count;
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
+ }
+ // count same values
+ const CType value = *it;
+ int64_t count = 0;
+ do {
+ ++it;
+ ++count;
+ } while (it != in_buffer.cend() && *it == value);
+ return std::make_pair(value, count);
+ };
+
+ Finalize<T>(ctx, out, std::move(gen));
}
- void Finalize(KernelContext* ctx, Datum* out) override {
- const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
- const auto& count_type = int64();
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
- int64_t n = this->options.n;
- if (n > state.DistinctValues()) {
- n = state.DistinctValues();
- } else if (n < 0) {
- n = 0;
+ static int64_t CopyArray(CType* buffer, const Array& array) {
+ const int64_t n = array.length() - array.null_count();
+ if (n > 0) {
+ int64_t index = 0;
+ const ArrayData& data = *array.data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ memcpy(buffer + index, values + pos, len * sizeof(CType));
+ index += len;
+ });
+ DCHECK_EQ(index, n);
}
+ return n;
+ }
+};
- auto mode_data = this->MakeArrayData(mode_type, n);
- auto count_data = this->MakeArrayData(count_type, n);
- if (n > 0) {
- KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(CType)));
- KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(int64_t)));
- CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
- int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
- this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+ using CType = typename T::c_type;
+
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // cross point to benefit from counting approach
+ // about 2x improvement for int32/64 from micro-benchmarking
+ static constexpr int kMinArraySize = 8192;
+ static constexpr int kMaxValueRange = 32768;
+
+ const Datum& datum = batch[0];
+ if (datum.length() - datum.null_count() >= kMinArraySize) {
+ CType min = std::numeric_limits<CType>::max();
+ CType max = std::numeric_limits<CType>::min();
+
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ min = std::min(min, values[pos + i]);
+ max = std::max(max, values[pos + i]);
+ }
+ });
Review comment:
It looks like this operation (compute the minmax of a Datum or ArrayData) is common enough to be factored out in an internal header file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612950895
##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
namespace {
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
constexpr char kModeFieldName[] = "mode";
constexpr char kCountFieldName[] = "count";
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array, int64_t& nan_count) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- nan_count = 0;
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- if (std::isnan(value)) {
- ++nan_count;
- } else {
- ++value_counts_map[value];
- }
- }
- });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+ Datum* out) {
+ const auto& mode_type = TypeTraits<InType>::type_singleton();
+ const auto& count_type = int64();
+
+ auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+ mode_data->buffers.resize(2, nullptr);
+ auto count_data = ArrayData::Make(count_type, n, 0);
+ count_data->buffers.resize(2, nullptr);
+
+ CType* mode_buffer = nullptr;
+ int64_t* count_buffer = nullptr;
+
+ if (n > 0) {
+ ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+ ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+ mode_buffer = mode_data->template GetMutableValues<CType>(1);
+ count_buffer = count_data->template GetMutableValues<int64_t>(1);
}
- return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_map[values[pos + i]];
- }
- });
- }
+ const auto& out_type =
+ struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+ *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
- return value_counts_map;
+ return std::make_pair(mode_buffer, count_buffer);
}
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
- const int range = static_cast<int>(max - min);
- DCHECK(range >= 0 && range < 64 * 1024 * 1024);
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- std::vector<int64_t> value_counts_vector(range + 1);
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_vector[values[pos + i] - min];
- }
- });
- }
-
- // Transfer value counts to a map to be consistent with other chunks
- CounterMap<CType> value_counts_map(range + 1);
- for (int i = 0; i <= range; ++i) {
- CType value = static_cast<CType>(i + min);
- int64_t count = value_counts_vector[i];
- if (count) {
- value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+ using CType = typename InType::c_type;
+
+ using ValueCountPair = std::pair<CType, int64_t>;
+ auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+ const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
+ return lhs.second > rhs.second ||
+ (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ };
+
+ std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+ std::move(gt));
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ while (true) {
+ const ValueCountPair& value_count = gen();
+ DCHECK_NE(value_count.second, 0);
+ if (value_count.second < 0) break; // EOF reached
+ if (static_cast<int64_t>(min_heap.size()) < options.n) {
+ min_heap.push(value_count);
+ } else if (gt(value_count, min_heap.top())) {
+ min_heap.pop();
+ min_heap.push(value_count);
}
}
+ const int64_t n = min_heap.size();
- return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
- // see https://issues.apache.org/jira/browse/ARROW-9873
- static constexpr int kMinArraySize = 8192 / sizeof(CType);
- static constexpr int kMaxValueRange = 16384;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if ((array.length() - array.null_count()) >= kMinArraySize) {
- CType min = std::numeric_limits<CType>::max();
- CType max = std::numeric_limits<CType>::min();
-
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- min = std::min(min, value);
- max = std::max(max, value);
- }
- });
-
- if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
- return CountValuesByVector(array, min, max);
- }
- }
- return CountValuesByMap(array);
-}
+ CType* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<InType>(n, ctx, out));
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- // we need just count ones and zeros
- CounterMap<CType> map;
- if (array.length() > array.null_count()) {
- map[true] = array.true_count();
- map[false] = array.length() - array.null_count() - map[true];
+ for (int64_t i = n - 1; i >= 0; --i) {
+ std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+ min_heap.pop();
}
- nan_count = 0;
- return map;
}
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- using Limits = std::numeric_limits<CType>;
- nan_count = 0;
- return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+ using CType = typename T::c_type;
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMapOrVector(array);
-}
+ CType min;
+ std::vector<int64_t> counts;
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>> // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMap(array, nan_count);
-}
+ CountModer(CType min, CType max) {
+ uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+ DCHECK_LT(value_range, 1 << 20);
+ this->min = min;
+ this->counts.resize(value_range, 0);
+ }
-template <typename ArrowType>
-struct ModeState {
- using ThisType = ModeState<ArrowType>;
- using CType = typename ArrowType::c_type;
-
- void MergeFrom(ThisType&& state) {
- if (this->value_counts.empty()) {
- this->value_counts = std::move(state.value_counts);
- } else {
- for (const auto& value_count : state.value_counts) {
- auto value = value_count.first;
- auto count = value_count.second;
- this->value_counts[value] += count;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // count values in all chunks, ignore nulls
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ ++this->counts[values[pos + i] - this->min];
+ }
+ });
}
}
- if (is_floating_type<ArrowType>::value) {
- this->nan_count += state.nan_count;
- }
- }
-
- // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
- void Finalize(CType* modes, int64_t* counts, const int64_t n) {
- DCHECK(n >= 1 && n <= this->DistinctValues());
- // mode 'greater than' comparator: larger count or same count with smaller value
- using ValueCountPair = std::pair<CType, int64_t>;
- auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
- const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
- return lhs.second > rhs.second ||
- (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ // generator to emit next value:count pair
+ int index = 0;
+ auto gen = [&]() {
+ for (; index < static_cast<int>(counts.size()); ++index) {
+ if (counts[index] != 0) {
+ auto value_count =
+ std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+ ++index;
+ return value_count;
+ }
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
};
- // initialize min-heap with first n modes
- std::vector<ValueCountPair> vector(n);
- // push nan if exists
- const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
- if (has_nan) {
- vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
- }
- // push n or n-1 modes
- auto it = this->value_counts.cbegin();
- for (int i = has_nan; i < n; ++i) {
- vector[i] = *it++;
- }
- // turn to min-heap
- std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
- min_heap(std::move(mode_gt), std::move(vector));
-
- // iterate and insert modes into min-heap
- // - mode < heap top: ignore mode
- // - mode > heap top: discard heap top, insert mode
- for (; it != this->value_counts.cend(); ++it) {
- if (mode_gt(*it, min_heap.top())) {
- min_heap.pop();
- min_heap.push(*it);
- }
+ Finalize<T>(ctx, out, std::move(gen));
+ }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ int64_t counts[2]{};
+
+ const Datum& datum = batch[0];
+ for (const auto& array : datum.chunks()) {
+ if (array->length() > array->null_count()) {
+ const int64_t true_count =
+ checked_pointer_cast<BooleanArray>(array)->true_count();
+ const int64_t false_count = array->length() - array->null_count() - true_count;
+ counts[true] += true_count;
+ counts[false] += false_count;
+ };
}
- // pop modes from min-heap and insert into output array (in reverse order)
- DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
- for (int64_t i = n - 1; i >= 0; --i) {
- std::tie(modes[i], counts[i]) = min_heap.top();
- min_heap.pop();
+ const ModeOptions& options = ModeState::Get(ctx);
+ const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+ const int64_t n = std::min(options.n, distinct_values);
+
+ bool* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<BooleanType>(n, ctx, out));
+
+ if (n >= 1) {
+ const bool index = counts[1] > counts[0];
+ mode_buffer[0] = index;
+ count_buffer[0] = counts[index];
+ if (n == 2) {
+ mode_buffer[1] = !index;
+ count_buffer[1] = counts[!index];
+ }
}
}
+};
- int64_t DistinctValues() const {
- return this->value_counts.size() +
- (is_floating_type<ArrowType>::value && this->nan_count > 0);
- }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+ using CType = typename T::c_type;
+ using Allocator = arrow::stl::allocator<CType>;
- int64_t nan_count = 0; // only make sense to floating types
- CounterMap<CType> value_counts;
-};
+ int64_t nan_count = 0;
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
- using ThisType = ModeImpl<ArrowType>;
- using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
- using CType = typename ArrowType::c_type;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // copy all chunks to a buffer, ignore nulls and nans
+ std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
- ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : out_type(out_type), options(options) {}
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ in_buffer.resize(in_length);
- void Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.value_counts = CountValues(array, this->state.nan_count);
- }
+ int64_t index = 0;
+ for (const auto& array : datum.chunks()) {
+ index += CopyArray(in_buffer.data() + index, *array);
+ }
- void MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast<ThisType&>(src);
- this->state.MergeFrom(std::move(other.state));
- }
+ // drop nan
+ if (is_floating_type<T>::value) {
+ const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+ [](CType v) { return v != v; });
+ this->nan_count = in_buffer.end() - it;
+ in_buffer.resize(it - in_buffer.begin());
+ }
+ }
- static std::shared_ptr<ArrayData> MakeArrayData(
- const std::shared_ptr<DataType>& data_type, int64_t n) {
- auto data = ArrayData::Make(data_type, n, 0);
- data->buffers.resize(2);
- data->buffers[0] = nullptr;
- data->buffers[1] = nullptr;
- return data;
+ // sort the input data to count same values
+ std::sort(in_buffer.begin(), in_buffer.end());
+
+ // generator to emit next value:count pair
+ auto it = in_buffer.cbegin();
+ int64_t nan_count_copy = this->nan_count;
+ auto gen = [&]() {
+ if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+ // handle NAN at last
+ if (nan_count_copy > 0) {
+ auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+ nan_count_copy = 0;
+ return value_count;
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
+ }
+ // count same values
+ const CType value = *it;
+ int64_t count = 0;
+ do {
+ ++it;
+ ++count;
+ } while (it != in_buffer.cend() && *it == value);
+ return std::make_pair(value, count);
+ };
+
+ Finalize<T>(ctx, out, std::move(gen));
}
- void Finalize(KernelContext* ctx, Datum* out) override {
- const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
- const auto& count_type = int64();
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
- int64_t n = this->options.n;
- if (n > state.DistinctValues()) {
- n = state.DistinctValues();
- } else if (n < 0) {
- n = 0;
+ static int64_t CopyArray(CType* buffer, const Array& array) {
+ const int64_t n = array.length() - array.null_count();
+ if (n > 0) {
+ int64_t index = 0;
+ const ArrayData& data = *array.data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ memcpy(buffer + index, values + pos, len * sizeof(CType));
+ index += len;
+ });
+ DCHECK_EQ(index, n);
}
+ return n;
+ }
+};
- auto mode_data = this->MakeArrayData(mode_type, n);
- auto count_data = this->MakeArrayData(count_type, n);
- if (n > 0) {
- KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(CType)));
- KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(int64_t)));
- CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
- int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
- this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+ using CType = typename T::c_type;
+
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // cross point to benefit from counting approach
+ // about 2x improvement for int32/64 from micro-benchmarking
+ static constexpr int kMinArraySize = 8192;
+ static constexpr int kMaxValueRange = 32768;
+
+ const Datum& datum = batch[0];
+ if (datum.length() - datum.null_count() >= kMinArraySize) {
+ CType min = std::numeric_limits<CType>::max();
+ CType max = std::numeric_limits<CType>::min();
+
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ min = std::min(min, values[pos + i]);
+ max = std::max(max, values[pos + i]);
+ }
+ });
Review comment:
Will do.
Quantile and Mode kernels share many common code. Will also factor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818855204
Benchmarks on Zen 2, clang 10.
```
----------------------------------------------------------------------------------------------
Non-regressions: (44)
----------------------------------------------------------------------------------------------
benchmark baseline contender change % counters
ModeKernelNarrow<Int64Type>/1048576/10000 1.344 GiB/sec 3.077 GiB/sec 129.033 {}
ModeKernelWide<FloatType>/1048576/0 26.692 MiB/sec 60.554 MiB/sec 126.860 {}
ModeKernelWide<FloatType>/1048576/100 26.720 MiB/sec 60.031 MiB/sec 124.667 {}
ModeKernelWide<FloatType>/1048576/10000 26.707 MiB/sec 59.770 MiB/sec 123.799 {}
ModeKernelWide<Int32Type>/1048576/0 35.031 MiB/sec 77.918 MiB/sec 122.426 {}
ModeKernelNarrow<Int64Type>/1048576/0 1.399 GiB/sec 3.091 GiB/sec 120.934 {}
ModeKernelWide<Int32Type>/1048576/10000 34.942 MiB/sec 77.128 MiB/sec 120.729 {}
ModeKernelWide<DoubleType>/1048576/100 57.929 MiB/sec 127.702 MiB/sec 120.447 {}
ModeKernelNarrow<Int64Type>/1048576/100 1.328 GiB/sec 2.874 GiB/sec 116.482 {}
ModeKernelWide<DoubleType>/1048576/10000 58.316 MiB/sec 125.540 MiB/sec 115.276 {}
ModeKernelWide<DoubleType>/1048576/0 58.070 MiB/sec 122.623 MiB/sec 111.164 {}
ModeKernelWide<FloatType>/1048576/2 55.623 MiB/sec 117.360 MiB/sec 110.994 {}
ModeKernelWide<DoubleType>/1048576/2 116.593 MiB/sec 243.300 MiB/sec 108.675 {}
ModeKernelWide<Int64Type>/1048576/100 79.077 MiB/sec 162.463 MiB/sec 105.449 {}
ModeKernelWide<Int64Type>/1048576/10000 78.890 MiB/sec 158.923 MiB/sec 101.450 {}
ModeKernelNarrow<Int64Type>/1048576/10 994.277 MiB/sec 1.895 GiB/sec 95.196 {}
ModeKernelWide<Int32Type>/1048576/2 70.771 MiB/sec 136.449 MiB/sec 92.804 {}
ModeKernelWide<Int64Type>/1048576/0 79.140 MiB/sec 150.926 MiB/sec 90.709 {}
ModeKernelWide<DoubleType>/1048576/10 72.236 MiB/sec 136.781 MiB/sec 89.354 {}
ModeKernelWide<Int64Type>/1048576/2 149.834 MiB/sec 282.411 MiB/sec 88.482 {}
ModeKernelWide<FloatType>/1048576/10 34.595 MiB/sec 65.197 MiB/sec 88.457 {}
ModeKernelWide<Int32Type>/1048576/10 44.789 MiB/sec 82.908 MiB/sec 85.107 {}
ModeKernelWide<Int32Type>/1048576/100 41.497 MiB/sec 76.746 MiB/sec 84.946 {}
ModeKernelNarrow<Int64Type>/1048576/2 799.410 MiB/sec 1.407 GiB/sec 80.226 {}
ModeKernelWide<Int64Type>/1048576/10 94.448 MiB/sec 168.857 MiB/sec 78.782 {}
ModeKernelNarrow<Int32Type>/1048576/0 1.099 GiB/sec 1.709 GiB/sec 55.441 {}
ModeKernelNarrow<Int32Type>/1048576/10 700.749 MiB/sec 1.059 GiB/sec 54.802 {}
ModeKernelNarrow<Int32Type>/1048576/2 500.474 MiB/sec 727.773 MiB/sec 45.417 {}
ModeKernelNarrow<Int32Type>/1048576/100 1.083 GiB/sec 1.529 GiB/sec 41.151 {}
ModeKernelNarrow<Int32Type>/1048576/10000 1.258 GiB/sec 1.658 GiB/sec 31.790 {}
ModeKernelNarrow<Int8Type>/1048576/1 504.487 GiB/sec 625.852 GiB/sec 24.057 {}
ModeKernelWide<Int32Type>/1048576/1 584.030 GiB/sec 704.471 GiB/sec 20.622 {}
ModeKernelNarrow<Int32Type>/1048576/1 595.824 GiB/sec 712.818 GiB/sec 19.636 {}
ModeKernelNarrow<Int64Type>/1048576/1 597.643 GiB/sec 707.585 GiB/sec 18.396 {}
ModeKernelWide<FloatType>/1048576/1 582.116 GiB/sec 679.811 GiB/sec 16.783 {}
ModeKernelWide<DoubleType>/1048576/1 583.923 GiB/sec 674.269 GiB/sec 15.472 {}
ModeKernelWide<Int64Type>/1048576/1 590.544 GiB/sec 678.023 GiB/sec 14.813 {}
ModeKernelNarrow<BooleanType>/1048576/1 593.884 GiB/sec 648.047 GiB/sec 9.120 {}
ModeKernelNarrow<BooleanType>/1048576/0 36.364 GiB/sec 38.774 GiB/sec 6.626 {}
ModeKernelNarrow<Int8Type>/1048576/2 366.079 MiB/sec 378.478 MiB/sec 3.387 {}
ModeKernelNarrow<BooleanType>/1048576/10 2.393 GiB/sec 2.412 GiB/sec 0.784 {}
ModeKernelNarrow<BooleanType>/1048576/10000 2.392 GiB/sec 2.395 GiB/sec 0.115 {}
ModeKernelNarrow<BooleanType>/1048576/2 2.393 GiB/sec 2.391 GiB/sec -0.071 {}
ModeKernelNarrow<BooleanType>/1048576/100 2.395 GiB/sec 2.369 GiB/sec -1.084 {}
-------------------------------------------------------------------------------------------
Regressions: (4)
-------------------------------------------------------------------------------------------
benchmark baseline contender change % counters
ModeKernelNarrow<Int8Type>/1048576/0 517.607 MiB/sec 477.913 MiB/sec -7.669 {}
ModeKernelNarrow<Int8Type>/1048576/10000 597.905 MiB/sec 473.417 MiB/sec -20.821 {}
ModeKernelNarrow<Int8Type>/1048576/10 616.529 MiB/sec 486.274 MiB/sec -21.127 {}
ModeKernelNarrow<Int8Type>/1048576/100 652.528 MiB/sec 478.726 MiB/sec -26.635 {}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613725348
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -492,16 +493,8 @@ class ArrayCountOrCompareSorter {
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
int64_t offset, const ArraySortOptions& options) {
if (values.length() >= countsort_min_len_ && values.length() > values.null_count()) {
- c_type min{std::numeric_limits<c_type>::max()};
- c_type max{std::numeric_limits<c_type>::min()};
-
- VisitRawValuesInline(
- values,
- [&](c_type v) {
- min = std::min(min, v);
- max = std::max(max, v);
- },
- []() {});
+ c_type min, max;
+ std::tie(min, max) = GetMinMax<c_type>(*values.data());
Review comment:
A bit performance improvement for int64narrow sorting.
```
benchmark baseline contender change %
ArraySortIndicesInt64Narrow/32768/2 507.257 MiB/sec 632.995 MiB/sec 24.788
ArraySortIndicesInt64Narrow/32768/10 643.182 MiB/sec 724.483 MiB/sec 12.640
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818629589
https://issues.apache.org/jira/browse/ARROW-11568
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r613725348
##########
File path: cpp/src/arrow/compute/kernels/vector_sort.cc
##########
@@ -492,16 +493,8 @@ class ArrayCountOrCompareSorter {
uint64_t* Sort(uint64_t* indices_begin, uint64_t* indices_end, const ArrayType& values,
int64_t offset, const ArraySortOptions& options) {
if (values.length() >= countsort_min_len_ && values.length() > values.null_count()) {
- c_type min{std::numeric_limits<c_type>::max()};
- c_type max{std::numeric_limits<c_type>::min()};
-
- VisitRawValuesInline(
- values,
- [&](c_type v) {
- min = std::min(min, v);
- max = std::max(max, v);
- },
- []() {});
+ c_type min, max;
+ std::tie(min, max) = GetMinMax<c_type>(*values.data());
Review comment:
Small performance improvement for int64narrow sorting.
```
benchmark baseline contender change %
ArraySortIndicesInt64Narrow/32768/2 507.257 MiB/sec 632.995 MiB/sec 24.788
ArraySortIndicesInt64Narrow/32768/10 643.182 MiB/sec 724.483 MiB/sec 12.640
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou commented on a change in pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#discussion_r612600080
##########
File path: cpp/src/arrow/compute/kernels/aggregate_mode.cc
##########
@@ -31,340 +33,359 @@ namespace internal {
namespace {
+using arrow::internal::checked_pointer_cast;
+using arrow::internal::VisitSetBitRunsVoid;
+
+using ModeState = OptionsWrapper<ModeOptions>;
+
constexpr char kModeFieldName[] = "mode";
constexpr char kCountFieldName[] = "count";
-// {value:count} map
-template <typename CType>
-using CounterMap = std::unordered_map<CType, int64_t>;
-
-// map based counter for floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array, int64_t& nan_count) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- nan_count = 0;
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- if (std::isnan(value)) {
- ++nan_count;
- } else {
- ++value_counts_map[value];
- }
- }
- });
+template <typename InType, typename CType = typename InType::c_type>
+Result<std::pair<CType*, int64_t*>> PrepareOutput(int64_t n, KernelContext* ctx,
+ Datum* out) {
+ const auto& mode_type = TypeTraits<InType>::type_singleton();
+ const auto& count_type = int64();
+
+ auto mode_data = ArrayData::Make(mode_type, /*length=*/n, /*null_count=*/0);
+ mode_data->buffers.resize(2, nullptr);
+ auto count_data = ArrayData::Make(count_type, n, 0);
+ count_data->buffers.resize(2, nullptr);
+
+ CType* mode_buffer = nullptr;
+ int64_t* count_buffer = nullptr;
+
+ if (n > 0) {
+ ARROW_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx->Allocate(n * sizeof(CType)));
+ ARROW_ASSIGN_OR_RAISE(count_data->buffers[1], ctx->Allocate(n * sizeof(int64_t)));
+ mode_buffer = mode_data->template GetMutableValues<CType>(1);
+ count_buffer = count_data->template GetMutableValues<int64_t>(1);
}
- return value_counts_map;
-}
-
-// map base counter for non floating points
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<!std::is_floating_point<CType>::value, CounterMap<CType>> CountValuesByMap(
- const ArrayType& array) {
- CounterMap<CType> value_counts_map;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_map[values[pos + i]];
- }
- });
- }
+ const auto& out_type =
+ struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
+ *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
- return value_counts_map;
+ return std::make_pair(mode_buffer, count_buffer);
}
-// vector based counter for int8 or integers with small value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByVector(const ArrayType& array, CType min, CType max) {
- const int range = static_cast<int>(max - min);
- DCHECK(range >= 0 && range < 64 * 1024 * 1024);
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- std::vector<int64_t> value_counts_vector(range + 1);
- if (array.length() > array.null_count()) {
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- ++value_counts_vector[values[pos + i] - min];
- }
- });
- }
-
- // Transfer value counts to a map to be consistent with other chunks
- CounterMap<CType> value_counts_map(range + 1);
- for (int i = 0; i <= range; ++i) {
- CType value = static_cast<CType>(i + min);
- int64_t count = value_counts_vector[i];
- if (count) {
- value_counts_map[value] = count;
+// find top-n value:count pairs with minimal heap
+// suboptimal for tiny or large n, possibly okay as we're not in hot path
+template <typename InType, typename Generator>
+void Finalize(KernelContext* ctx, Datum* out, Generator&& gen) {
+ using CType = typename InType::c_type;
+
+ using ValueCountPair = std::pair<CType, int64_t>;
+ auto gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
+ const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
+ return lhs.second > rhs.second ||
+ (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ };
+
+ std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(gt)> min_heap(
+ std::move(gt));
+
+ const ModeOptions& options = ModeState::Get(ctx);
+ while (true) {
+ const ValueCountPair& value_count = gen();
+ DCHECK_NE(value_count.second, 0);
+ if (value_count.second < 0) break; // EOF reached
+ if (static_cast<int64_t>(min_heap.size()) < options.n) {
+ min_heap.push(value_count);
+ } else if (gt(value_count, min_heap.top())) {
+ min_heap.pop();
+ min_heap.push(value_count);
}
}
+ const int64_t n = min_heap.size();
- return value_counts_map;
-}
-
-// map or vector based counter for int16/32/64 per value range
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-CounterMap<CType> CountValuesByMapOrVector(const ArrayType& array) {
- // see https://issues.apache.org/jira/browse/ARROW-9873
- static constexpr int kMinArraySize = 8192 / sizeof(CType);
- static constexpr int kMaxValueRange = 16384;
- const ArrayData& data = *array.data();
- const CType* values = data.GetValues<CType>(1);
-
- if ((array.length() - array.null_count()) >= kMinArraySize) {
- CType min = std::numeric_limits<CType>::max();
- CType max = std::numeric_limits<CType>::min();
-
- arrow::internal::VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
- [&](int64_t pos, int64_t len) {
- for (int64_t i = 0; i < len; ++i) {
- const auto value = values[pos + i];
- min = std::min(min, value);
- max = std::max(max, value);
- }
- });
-
- if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
- return CountValuesByVector(array, min, max);
- }
- }
- return CountValuesByMap(array);
-}
+ CType* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<InType>(n, ctx, out));
-// bool
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_boolean_type<typename ArrayType::TypeClass>::value, CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- // we need just count ones and zeros
- CounterMap<CType> map;
- if (array.length() > array.null_count()) {
- map[true] = array.true_count();
- map[false] = array.length() - array.null_count() - map[true];
+ for (int64_t i = n - 1; i >= 0; --i) {
+ std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top();
+ min_heap.pop();
}
- nan_count = 0;
- return map;
}
-// int8
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && sizeof(CType) == 1,
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- using Limits = std::numeric_limits<CType>;
- nan_count = 0;
- return CountValuesByVector(array, Limits::min(), Limits::max());
-}
+// count value occurances for integers with narrow value range
+// O(1) space, O(n) time
+template <typename T>
+struct CountModer {
+ using CType = typename T::c_type;
-// int16/32/64
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<is_integer_type<typename ArrayType::TypeClass>::value && (sizeof(CType) > 1),
- CounterMap<CType>>
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMapOrVector(array);
-}
+ CType min;
+ std::vector<int64_t> counts;
-// float/double
-template <typename ArrayType, typename CType = typename ArrayType::TypeClass::c_type>
-enable_if_t<(std::is_floating_point<CType>::value), CounterMap<CType>> // NOLINT format
-CountValues(const ArrayType& array, int64_t& nan_count) {
- nan_count = 0;
- return CountValuesByMap(array, nan_count);
-}
+ CountModer(CType min, CType max) {
+ uint32_t value_range = static_cast<uint32_t>(max - min) + 1;
+ DCHECK_LT(value_range, 1 << 20);
+ this->min = min;
+ this->counts.resize(value_range, 0);
+ }
-template <typename ArrowType>
-struct ModeState {
- using ThisType = ModeState<ArrowType>;
- using CType = typename ArrowType::c_type;
-
- void MergeFrom(ThisType&& state) {
- if (this->value_counts.empty()) {
- this->value_counts = std::move(state.value_counts);
- } else {
- for (const auto& value_count : state.value_counts) {
- auto value = value_count.first;
- auto count = value_count.second;
- this->value_counts[value] += count;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // count values in all chunks, ignore nulls
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ ++this->counts[values[pos + i] - this->min];
+ }
+ });
}
}
- if (is_floating_type<ArrowType>::value) {
- this->nan_count += state.nan_count;
- }
- }
-
- // find top-n value/count pairs with min-heap (priority queue with '>' comparator)
- void Finalize(CType* modes, int64_t* counts, const int64_t n) {
- DCHECK(n >= 1 && n <= this->DistinctValues());
- // mode 'greater than' comparator: larger count or same count with smaller value
- using ValueCountPair = std::pair<CType, int64_t>;
- auto mode_gt = [](const ValueCountPair& lhs, const ValueCountPair& rhs) {
- const bool rhs_is_nan = rhs.first != rhs.first; // nan as largest value
- return lhs.second > rhs.second ||
- (lhs.second == rhs.second && (lhs.first < rhs.first || rhs_is_nan));
+ // generator to emit next value:count pair
+ int index = 0;
+ auto gen = [&]() {
+ for (; index < static_cast<int>(counts.size()); ++index) {
+ if (counts[index] != 0) {
+ auto value_count =
+ std::make_pair(static_cast<CType>(index + this->min), counts[index]);
+ ++index;
+ return value_count;
+ }
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
};
- // initialize min-heap with first n modes
- std::vector<ValueCountPair> vector(n);
- // push nan if exists
- const bool has_nan = is_floating_type<ArrowType>::value && this->nan_count > 0;
- if (has_nan) {
- vector[0] = std::make_pair(static_cast<CType>(NAN), this->nan_count);
- }
- // push n or n-1 modes
- auto it = this->value_counts.cbegin();
- for (int i = has_nan; i < n; ++i) {
- vector[i] = *it++;
- }
- // turn to min-heap
- std::priority_queue<ValueCountPair, std::vector<ValueCountPair>, decltype(mode_gt)>
- min_heap(std::move(mode_gt), std::move(vector));
-
- // iterate and insert modes into min-heap
- // - mode < heap top: ignore mode
- // - mode > heap top: discard heap top, insert mode
- for (; it != this->value_counts.cend(); ++it) {
- if (mode_gt(*it, min_heap.top())) {
- min_heap.pop();
- min_heap.push(*it);
- }
+ Finalize<T>(ctx, out, std::move(gen));
+ }
+};
+
+// booleans can be handled more straightforward
+template <>
+struct CountModer<BooleanType> {
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ int64_t counts[2]{};
+
+ const Datum& datum = batch[0];
+ for (const auto& array : datum.chunks()) {
+ if (array->length() > array->null_count()) {
+ const int64_t true_count =
+ checked_pointer_cast<BooleanArray>(array)->true_count();
+ const int64_t false_count = array->length() - array->null_count() - true_count;
+ counts[true] += true_count;
+ counts[false] += false_count;
+ };
}
- // pop modes from min-heap and insert into output array (in reverse order)
- DCHECK_EQ(min_heap.size(), static_cast<size_t>(n));
- for (int64_t i = n - 1; i >= 0; --i) {
- std::tie(modes[i], counts[i]) = min_heap.top();
- min_heap.pop();
+ const ModeOptions& options = ModeState::Get(ctx);
+ const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0);
+ const int64_t n = std::min(options.n, distinct_values);
+
+ bool* mode_buffer;
+ int64_t* count_buffer;
+ KERNEL_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), ctx,
+ PrepareOutput<BooleanType>(n, ctx, out));
+
+ if (n >= 1) {
+ const bool index = counts[1] > counts[0];
+ mode_buffer[0] = index;
+ count_buffer[0] = counts[index];
+ if (n == 2) {
+ mode_buffer[1] = !index;
+ count_buffer[1] = counts[!index];
+ }
}
}
+};
- int64_t DistinctValues() const {
- return this->value_counts.size() +
- (is_floating_type<ArrowType>::value && this->nan_count > 0);
- }
+// copy and sort approach for floating points or integers with wide value range
+// O(n) space, O(nlogn) time
+template <typename T>
+struct SortModer {
+ using CType = typename T::c_type;
+ using Allocator = arrow::stl::allocator<CType>;
- int64_t nan_count = 0; // only make sense to floating types
- CounterMap<CType> value_counts;
-};
+ int64_t nan_count = 0;
-template <typename ArrowType>
-struct ModeImpl : public ScalarAggregator {
- using ThisType = ModeImpl<ArrowType>;
- using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
- using CType = typename ArrowType::c_type;
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // copy all chunks to a buffer, ignore nulls and nans
+ std::vector<CType, Allocator> in_buffer(Allocator(ctx->memory_pool()));
- ModeImpl(const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : out_type(out_type), options(options) {}
+ const Datum& datum = batch[0];
+ const int64_t in_length = datum.length() - datum.null_count();
+ if (in_length > 0) {
+ in_buffer.resize(in_length);
- void Consume(KernelContext*, const ExecBatch& batch) override {
- ArrayType array(batch[0].array());
- this->state.value_counts = CountValues(array, this->state.nan_count);
- }
+ int64_t index = 0;
+ for (const auto& array : datum.chunks()) {
+ index += CopyArray(in_buffer.data() + index, *array);
+ }
- void MergeFrom(KernelContext*, KernelState&& src) override {
- auto& other = checked_cast<ThisType&>(src);
- this->state.MergeFrom(std::move(other.state));
- }
+ // drop nan
+ if (is_floating_type<T>::value) {
+ const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(),
+ [](CType v) { return v != v; });
+ this->nan_count = in_buffer.end() - it;
+ in_buffer.resize(it - in_buffer.begin());
+ }
+ }
- static std::shared_ptr<ArrayData> MakeArrayData(
- const std::shared_ptr<DataType>& data_type, int64_t n) {
- auto data = ArrayData::Make(data_type, n, 0);
- data->buffers.resize(2);
- data->buffers[0] = nullptr;
- data->buffers[1] = nullptr;
- return data;
+ // sort the input data to count same values
+ std::sort(in_buffer.begin(), in_buffer.end());
+
+ // generator to emit next value:count pair
+ auto it = in_buffer.cbegin();
+ int64_t nan_count_copy = this->nan_count;
+ auto gen = [&]() {
+ if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) {
+ // handle NAN at last
+ if (nan_count_copy > 0) {
+ auto value_count = std::make_pair(static_cast<CType>(NAN), nan_count_copy);
+ nan_count_copy = 0;
+ return value_count;
+ }
+ return std::make_pair<CType, int64_t>(0, -1); // EOF
+ }
+ // count same values
+ const CType value = *it;
+ int64_t count = 0;
+ do {
+ ++it;
+ ++count;
+ } while (it != in_buffer.cend() && *it == value);
+ return std::make_pair(value, count);
+ };
+
+ Finalize<T>(ctx, out, std::move(gen));
}
- void Finalize(KernelContext* ctx, Datum* out) override {
- const auto& mode_type = TypeTraits<ArrowType>::type_singleton();
- const auto& count_type = int64();
- const auto& out_type =
- struct_({field(kModeFieldName, mode_type), field(kCountFieldName, count_type)});
-
- int64_t n = this->options.n;
- if (n > state.DistinctValues()) {
- n = state.DistinctValues();
- } else if (n < 0) {
- n = 0;
+ static int64_t CopyArray(CType* buffer, const Array& array) {
+ const int64_t n = array.length() - array.null_count();
+ if (n > 0) {
+ int64_t index = 0;
+ const ArrayData& data = *array.data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ memcpy(buffer + index, values + pos, len * sizeof(CType));
+ index += len;
+ });
+ DCHECK_EQ(index, n);
}
+ return n;
+ }
+};
- auto mode_data = this->MakeArrayData(mode_type, n);
- auto count_data = this->MakeArrayData(count_type, n);
- if (n > 0) {
- KERNEL_ASSIGN_OR_RAISE(mode_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(CType)));
- KERNEL_ASSIGN_OR_RAISE(count_data->buffers[1], ctx,
- ctx->Allocate(n * sizeof(int64_t)));
- CType* mode_buffer = mode_data->template GetMutableValues<CType>(1);
- int64_t* count_buffer = count_data->template GetMutableValues<int64_t>(1);
- this->state.Finalize(mode_buffer, count_buffer, n);
+// pick counting or sorting approach per integers value range
+template <typename T>
+struct CountOrSortModer {
+ using CType = typename T::c_type;
+
+ void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ // cross point to benefit from counting approach
+ // about 2x improvement for int32/64 from micro-benchmarking
+ static constexpr int kMinArraySize = 8192;
+ static constexpr int kMaxValueRange = 32768;
+
+ const Datum& datum = batch[0];
+ if (datum.length() - datum.null_count() >= kMinArraySize) {
+ CType min = std::numeric_limits<CType>::max();
+ CType max = std::numeric_limits<CType>::min();
+
+ for (const auto& array : datum.chunks()) {
+ const ArrayData& data = *array->data();
+ const CType* values = data.GetValues<CType>(1);
+ VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length,
+ [&](int64_t pos, int64_t len) {
+ for (int64_t i = 0; i < len; ++i) {
+ min = std::min(min, values[pos + i]);
+ max = std::max(max, values[pos + i]);
+ }
+ });
+ }
+
+ if (static_cast<uint64_t>(max) - static_cast<uint64_t>(min) <= kMaxValueRange) {
+ CountModer<T>(min, max).Exec(ctx, batch, out);
+ return;
+ }
}
- *out = Datum(ArrayData::Make(out_type, n, {nullptr}, {mode_data, count_data}, 0));
+ SortModer<T>().Exec(ctx, batch, out);
}
+};
- std::shared_ptr<DataType> out_type;
- ModeState<ArrowType> state;
- ModeOptions options;
+template <typename InType, typename Enable = void>
+struct Moder;
+
+template <>
+struct Moder<Int8Type> {
+ CountModer<Int8Type> impl;
+ Moder() : impl(-128, 127) {}
};
-struct ModeInitState {
- std::unique_ptr<KernelState> state;
- KernelContext* ctx;
- const DataType& in_type;
- const std::shared_ptr<DataType>& out_type;
- const ModeOptions& options;
+template <>
+struct Moder<UInt8Type> {
+ CountModer<UInt8Type> impl;
+ Moder() : impl(0, 255) {}
+};
- ModeInitState(KernelContext* ctx, const DataType& in_type,
- const std::shared_ptr<DataType>& out_type, const ModeOptions& options)
- : ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}
+template <>
+struct Moder<BooleanType> {
+ CountModer<BooleanType> impl;
+};
- Status Visit(const DataType&) { return Status::NotImplemented("No mode implemented"); }
+template <typename InType>
+struct Moder<InType, enable_if_t<(is_integer_type<InType>::value &&
+ (sizeof(typename InType::c_type) > 1))>> {
+ CountOrSortModer<InType> impl;
+};
- Status Visit(const HalfFloatType&) {
- return Status::NotImplemented("No mode implemented");
- }
+template <typename InType>
+struct Moder<InType, enable_if_t<is_floating_type<InType>::value>> {
+ SortModer<InType> impl;
+};
- template <typename Type>
- enable_if_t<is_number_type<Type>::value || is_boolean_type<Type>::value, Status> Visit(
- const Type&) {
- state.reset(new ModeImpl<Type>(out_type, options));
- return Status::OK();
- }
+template <typename _, typename InType>
+struct ModeExecutor {
+ static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
+ if (ctx->state() == nullptr) {
+ ctx->SetStatus(Status::Invalid("Mode requires ModeOptions"));
+ return;
+ }
+ const ModeOptions& options = ModeState::Get(ctx);
+ if (options.n <= 0) {
+ ctx->SetStatus(Status::Invalid("ModeOption::n must be positive"));
Review comment:
"strictly positive" or "> 0"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-819167274
> Do the existing tests exercise both the narrow and wide cases?
Yes. https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/aggregate_test.cc#L1075-L1082
I do find a comment should be updated `hashmap-based` -> `sorter-based`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] pitrou closed pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10009:
URL: https://github.com/apache/arrow/pull/10009
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] cyb70289 commented on pull request #10009: ARROW-11568: [C++][Compute] Rewrite mode kernel
Posted by GitBox <gi...@apache.org>.
cyb70289 commented on pull request #10009:
URL: https://github.com/apache/arrow/pull/10009#issuecomment-818452704
Benchmark on skylake, clang-9.
```
-----------------------------------------------------------------------------------------------
Non-regressions: (40)
-----------------------------------------------------------------------------------------------
benchmark baseline contender change % counters
ModeKernelWide<Int32Type>/1048576/0 20.644 MiB/sec 66.187 MiB/sec 220.613 {}
ModeKernelWide<Int32Type>/1048576/100 20.768 MiB/sec 65.772 MiB/sec 216.699 {}
ModeKernelWide<Int32Type>/1048576/10000 20.626 MiB/sec 65.291 MiB/sec 216.546 {}
ModeKernelWide<Int32Type>/1048576/10 22.390 MiB/sec 69.904 MiB/sec 212.209 {}
ModeKernelWide<FloatType>/1048576/10000 18.174 MiB/sec 54.517 MiB/sec 199.975 {}
ModeKernelWide<FloatType>/1048576/0 18.423 MiB/sec 54.869 MiB/sec 197.826 {}
ModeKernelWide<FloatType>/1048576/10 20.009 MiB/sec 59.346 MiB/sec 196.601 {}
ModeKernelWide<FloatType>/1048576/100 18.486 MiB/sec 54.568 MiB/sec 195.186 {}
ModeKernelWide<Int64Type>/1048576/100 46.593 MiB/sec 136.786 MiB/sec 193.578 {}
ModeKernelWide<Int64Type>/1048576/0 46.434 MiB/sec 134.296 MiB/sec 189.221 {}
ModeKernelWide<Int64Type>/1048576/10000 46.435 MiB/sec 134.149 MiB/sec 188.894 {}
ModeKernelNarrow<Int64Type>/1048576/0 893.469 MiB/sec 2.515 GiB/sec 188.197 {}
ModeKernelNarrow<Int64Type>/1048576/10000 886.403 MiB/sec 2.484 GiB/sec 186.933 {}
ModeKernelWide<Int64Type>/1048576/10 50.182 MiB/sec 143.034 MiB/sec 185.029 {}
ModeKernelWide<DoubleType>/1048576/10000 40.525 MiB/sec 115.477 MiB/sec 184.953 {}
ModeKernelWide<DoubleType>/1048576/100 41.114 MiB/sec 116.862 MiB/sec 184.241 {}
ModeKernelWide<DoubleType>/1048576/10 44.555 MiB/sec 126.095 MiB/sec 183.008 {}
ModeKernelWide<DoubleType>/1048576/0 41.083 MiB/sec 113.499 MiB/sec 176.263 {}
ModeKernelWide<FloatType>/1048576/2 39.704 MiB/sec 106.142 MiB/sec 167.335 {}
ModeKernelNarrow<Int64Type>/1048576/100 853.217 MiB/sec 2.223 GiB/sec 166.836 {}
ModeKernelWide<Int32Type>/1048576/2 43.329 MiB/sec 115.209 MiB/sec 165.893 {}
ModeKernelWide<DoubleType>/1048576/2 88.485 MiB/sec 223.284 MiB/sec 152.340 {}
ModeKernelWide<Int64Type>/1048576/2 97.572 MiB/sec 236.831 MiB/sec 142.723 {}
ModeKernelNarrow<Int32Type>/1048576/0 715.706 MiB/sec 1.522 GiB/sec 117.703 {}
ModeKernelNarrow<Int32Type>/1048576/10000 714.237 MiB/sec 1.497 GiB/sec 114.577 {}
ModeKernelNarrow<Int64Type>/1048576/10 701.986 MiB/sec 1.399 GiB/sec 104.094 {}
ModeKernelNarrow<Int32Type>/1048576/100 666.834 MiB/sec 1.291 GiB/sec 98.234 {}
ModeKernelNarrow<Int64Type>/1048576/2 611.412 MiB/sec 1.057 GiB/sec 77.008 {}
ModeKernelNarrow<Int32Type>/1048576/10 499.852 MiB/sec 780.344 MiB/sec 56.115 {}
ModeKernelNarrow<Int32Type>/1048576/2 402.986 MiB/sec 562.758 MiB/sec 39.647 {}
ModeKernelNarrow<Int8Type>/1048576/1 499.705 GiB/sec 639.311 GiB/sec 27.938 {}
ModeKernelWide<DoubleType>/1048576/1 599.969 GiB/sec 738.218 GiB/sec 23.043 {}
ModeKernelNarrow<Int64Type>/1048576/1 610.213 GiB/sec 743.338 GiB/sec 21.816 {}
ModeKernelWide<Int64Type>/1048576/1 608.551 GiB/sec 735.741 GiB/sec 20.901 {}
ModeKernelWide<Int32Type>/1048576/1 611.261 GiB/sec 727.163 GiB/sec 18.961 {}
ModeKernelNarrow<Int32Type>/1048576/1 601.834 GiB/sec 714.017 GiB/sec 18.640 {}
ModeKernelWide<FloatType>/1048576/1 612.628 GiB/sec 716.591 GiB/sec 16.970 {}
ModeKernelNarrow<BooleanType>/1048576/1 628.851 GiB/sec 691.936 GiB/sec 10.032 {}
ModeKernelNarrow<BooleanType>/1048576/0 21.709 GiB/sec 22.397 GiB/sec 3.166 {}
ModeKernelNarrow<Int8Type>/1048576/2 293.420 MiB/sec 286.770 MiB/sec -2.266 {}
-------------------------------------------------------------------------------------------------
Regressions: (8)
-------------------------------------------------------------------------------------------------
benchmark baseline contender change % counters
ModeKernelNarrow<BooleanType>/1048576/10000 1.959 GiB/sec 1.856 GiB/sec -5.274 {}
ModeKernelNarrow<BooleanType>/1048576/2 1.957 GiB/sec 1.854 GiB/sec -5.293 {}
ModeKernelNarrow<BooleanType>/1048576/100 1.972 GiB/sec 1.866 GiB/sec -5.374 {}
ModeKernelNarrow<BooleanType>/1048576/10 1.974 GiB/sec 1.855 GiB/sec -6.027 {}
ModeKernelNarrow<Int8Type>/1048576/10 364.472 MiB/sec 342.415 MiB/sec -6.052 {}
ModeKernelNarrow<Int8Type>/1048576/0 484.491 MiB/sec 440.347 MiB/sec -9.111 {}
ModeKernelNarrow<Int8Type>/1048576/100 458.333 MiB/sec 416.045 MiB/sec -9.226 {}
ModeKernelNarrow<Int8Type>/1048576/10000 486.679 MiB/sec 439.186 MiB/sec -9.759 {}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org