You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/19 09:04:07 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #13630: ARROW-16852: [C++] Migrate remaining kernels to use ExecSpan, remove ExecBatchIterator

pitrou commented on code in PR #13630:
URL: https://github.com/apache/arrow/pull/13630#discussion_r924240870


##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -193,8 +193,11 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
 
   // Merge if necessary
   for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
+    // TODO: Return ExecSpan from GetUniques; but need to figure out memory
+    // management strategy

Review Comment:
   Same here?



##########
cpp/src/arrow/compute/exec/aggregate.cc:
##########
@@ -174,15 +171,18 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
       auto grouper = groupers[thread_index].get();
 
       // compute a batch of group ids
+      // TODO(wesm): refactor Grouper::Consume to write into preallocated
+      // memory

Review Comment:
   Rather than adding TODOs in the code, this would better be tracked and remembered using a JIRA, IMHO.



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -84,7 +51,10 @@ class ARROW_EXPORT ExecSpanIterator {
   /// \param[in] batch the input ExecBatch
   /// \param[in] max_chunksize the maximum length of each ExecSpan. Depending
   /// on the chunk layout of ChunkedArray.
-  Status Init(const ExecBatch& batch, int64_t max_chunksize = kDefaultMaxChunksize);
+  /// \param[in] promote_if_all_scalars if all of the values are scalars,
+  /// return them in each ExecSpan as ArraySpan of length 1

Review Comment:
   What happens otherwise? (i.e. if all values are scalars but `promote_if_all_scalars` is false)



##########
cpp/src/arrow/compute/row/grouper.cc:
##########
@@ -294,20 +282,21 @@ struct GrouperFastImpl : Grouper {
                                     ctx_->memory_pool()));
           }
         }
-        return ConsumeImpl(expanded);
+        return ConsumeImpl(ExecSpan(expanded));
       }
     }
     return ConsumeImpl(batch);
   }
 
-  Result<Datum> ConsumeImpl(const ExecBatch& batch) {
+  Result<Datum> ConsumeImpl(const ExecSpan& batch) {
     int64_t num_rows = batch.length;
     int num_columns = batch.num_values();
     // Process dictionaries
     for (int icol = 0; icol < num_columns; ++icol) {
       if (key_types_[icol].id() == Type::DICTIONARY) {
-        auto data = batch[icol].array();
-        auto dict = MakeArray(data->dictionary);
+        const ArraySpan& data = batch[icol].array;
+        // TODO(wesm): do not require ToArrayData here

Review Comment:
   Add a JIRA?



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -110,6 +80,7 @@ class ARROW_EXPORT ExecSpanIterator {
   bool initialized_ = false;
   bool have_chunked_arrays_ = false;
   bool have_all_scalars_ = false;
+  bool promote_if_all_scalars_ = false;

Review Comment:
   For the sake of avoiding confusion, can this have the same default value `true` as in the constructor?



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -609,20 +609,17 @@ struct VectorKernel : public Kernel {
 // ----------------------------------------------------------------------
 // ScalarAggregateKernel (for ScalarAggregateFunction)
 
-using ScalarAggregateConsume = std::function<Status(KernelContext*, const ExecBatch&)>;
-
-using ScalarAggregateMerge =
-    std::function<Status(KernelContext*, KernelState&&, KernelState*)>;
-
+typedef Status (*ScalarAggregateConsume)(KernelContext*, const ExecSpan&);
+typedef Status (*ScalarAggregateMerge)(KernelContext*, KernelState&&, KernelState*);
 // Finalize returns Datum to permit multiple return values
-using ScalarAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+typedef Status (*ScalarAggregateFinalize)(KernelContext*, Datum*);

Review Comment:
   The style guide discourages `typedef` and recommends `using` instead



##########
cpp/src/arrow/compute/kernels/aggregate_basic_internal.h:
##########
@@ -448,9 +449,12 @@ struct MinMaxImpl : public ScalarAggregator {
     return Status::OK();
   }
 
-  Status ConsumeArray(const ArrayType& arr) {
+  Status ConsumeArray(const ArraySpan& arr_span) {
     StateType local;
 
+    // TODO(wesm): do not use ToArrayData
+    ArrayType arr(arr_span.ToArrayData());

Review Comment:
   Is this something that you plan to do in a later PR? Or can it just be done here?



##########
cpp/src/arrow/compute/kernels/hash_aggregate_test.cc:
##########
@@ -202,14 +202,16 @@ Result<Datum> GroupByUsingExecPlan(const std::vector<Datum>& arguments,
   inputs.reserve(inputs.size() + keys.size());
   inputs.insert(inputs.end(), keys.begin(), keys.end());
 
-  ARROW_ASSIGN_OR_RAISE(auto batch_iterator,
-                        ExecBatchIterator::Make(inputs, ctx->exec_chunksize()));
+  ExecSpanIterator span_iterator;
+  ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make(inputs));
+  RETURN_NOT_OK(span_iterator.Init(batch, ctx->exec_chunksize()));
   BatchesWithSchema input;
   input.schema = schema(std::move(scan_fields));
-  ExecBatch batch;
-  while (batch_iterator->Next(&batch)) {
-    if (batch.length == 0) continue;
-    input.batches.push_back(std::move(batch));
+  ExecSpan span;
+  while (span_iterator.Next(&span)) {
+    if (span.length == 0) continue;
+    // TODO(wesm): investigate possibility of ExecPlans using ExecSpan

Review Comment:
   Should this be a JIRA instead? TODOs sprinkled in code are often ignored.



##########
cpp/src/arrow/compute/kernel.h:
##########
@@ -659,23 +656,20 @@ struct ScalarAggregateKernel : public Kernel {
 // ----------------------------------------------------------------------
 // HashAggregateKernel (for HashAggregateFunction)
 
-using HashAggregateResize = std::function<Status(KernelContext*, int64_t)>;
-
-using HashAggregateConsume = std::function<Status(KernelContext*, const ExecBatch&)>;
-
-using HashAggregateMerge =
-    std::function<Status(KernelContext*, KernelState&&, const ArrayData&)>;
+typedef Status (*HashAggregateResize)(KernelContext*, int64_t);
+typedef Status (*HashAggregateConsume)(KernelContext*, const ExecSpan&);
+typedef Status (*HashAggregateMerge)(KernelContext*, KernelState&&, const ArrayData&);
 
 // Finalize returns Datum to permit multiple return values
-using HashAggregateFinalize = std::function<Status(KernelContext*, Datum*)>;
+typedef Status (*HashAggregateFinalize)(KernelContext*, Datum*);

Review Comment:
   Same here.



##########
cpp/src/arrow/compute/kernels/aggregate_quantile.cc:
##########
@@ -471,45 +471,6 @@ struct ExactQuantiler<InType, enable_if_t<is_decimal_type<InType>::value>> {
   SortQuantiler<InType> impl;
 };
 
-template <typename T>
-Status ScalarQuantile(KernelContext* ctx, const Scalar& scalar, ExecResult* out) {
-  const QuantileOptions& options = QuantileState::Get(ctx);
-  using CType = typename TypeTraits<T>::CType;
-  ArrayData* output = out->array_data().get();
-  output->length = options.q.size();
-  auto out_type = IsDataPoint(options) ? scalar.type : float64();
-  ARROW_ASSIGN_OR_RAISE(output->buffers[1],
-                        ctx->Allocate(output->length * out_type->byte_width()));
-
-  if (!scalar.is_valid || options.min_count > 1) {
-    output->null_count = output->length;
-    ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(output->length));
-    bit_util::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0, output->length,
-                        false);
-    if (IsDataPoint(options)) {
-      CType* out_buffer = output->template GetMutableValues<CType>(1);
-      std::fill(out_buffer, out_buffer + output->length, CType(0));
-    } else {
-      double* out_buffer = output->template GetMutableValues<double>(1);
-      std::fill(out_buffer, out_buffer + output->length, 0.0);
-    }
-    return Status::OK();
-  }
-  output->null_count = 0;
-  if (IsDataPoint(options)) {
-    CType* out_buffer = output->template GetMutableValues<CType>(1);
-    for (int64_t i = 0; i < output->length; i++) {
-      out_buffer[i] = UnboxScalar<T>::Unbox(scalar);
-    }
-  } else {
-    double* out_buffer = output->template GetMutableValues<double>(1);
-    for (int64_t i = 0; i < output->length; i++) {
-      out_buffer[i] = DataPointToDouble(UnboxScalar<T>::Unbox(scalar), *scalar.type);
-    }
-  }
-  return Status::OK();
-}
-

Review Comment:
   Neat removal :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org