You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/10 19:09:28 UTC

[GitHub] [arrow] save-buffer commented on a diff in pull request #13364: ARROW-16756: [C++] Introduce non-owning ArraySpan, ExecSpan data structures and refactor ScalarKernels to use them

save-buffer commented on code in PR #13364:
URL: https://github.com/apache/arrow/pull/13364#discussion_r894764398


##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -62,16 +63,65 @@ class ARROW_EXPORT ExecBatchIterator {
   int64_t max_chunksize() const { return max_chunksize_; }
 
  private:
-  ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
+  ExecBatchIterator(const std::vector<Datum>& args, int64_t length,
+                    int64_t max_chunksize);
 
-  std::vector<Datum> args_;
+  const std::vector<Datum>& args_;
   std::vector<int> chunk_indexes_;
   std::vector<int64_t> chunk_positions_;
   int64_t position_;
   int64_t length_;
   int64_t max_chunksize_;
 };
 
+/// \brief Break std::vector<Datum> into a sequence of non-owning
+/// ExecSpan for kernel execution. The lifetime of the Datum vector
+/// must be longer than the lifetime of this object
+class ARROW_EXPORT ExecSpanIterator {
+ public:
+  /// \brief Construct iterator and do basic argument validation
+  ///
+  /// \param[in] args the Datum argument, must be all array-like or scalar
+  /// \param[in] max_chunksize the maximum length of each ExecSpan. Depending
+  /// on the chunk layout of ChunkedArray.
+  static Result<std::unique_ptr<ExecSpanIterator>> Make(

Review Comment:
   Why do we need `unique_ptr` here? I think for something like an iterator it would make more sense to avoid any heap allocations



##########
cpp/src/arrow/compute/kernels/util_internal.h:
##########
@@ -67,14 +67,16 @@ int GetBitWidth(const DataType& type);
 // rather than duplicating compiled code to do all these in each kernel.
 PrimitiveArg GetPrimitiveArg(const ArrayData& arr);
 
-// Augment a unary ArrayKernelExec which supports only array-like inputs with support for
-// scalar inputs. Scalars will be transformed to 1-long arrays with the scalar's value (or
-// null if the scalar is null) as its only element. This 1-long array will be passed to
-// the original exec, then the only element of the resulting array will be extracted as
-// the output scalar. This could be far more efficient, but instead of optimizing this
-// it'd be better to support scalar inputs "upstream" in original exec.
-ArrayKernelExec TrivialScalarUnaryAsArraysExec(
-    ArrayKernelExec exec, NullHandling::type null_handling = NullHandling::INTERSECTION);
+// Augment a unary ScalarernelExec which supports only array-like inputs with

Review Comment:
   ScalarKernelExec



##########
cpp/src/arrow/compute/exec.cc:
##########
@@ -325,35 +352,158 @@ bool ExecBatchIterator::Next(ExecBatch* batch) {
   return true;
 }
 
+// ----------------------------------------------------------------------
+// ExecSpanIterator; to eventually replace ExecBatchIterator
+
+ExecSpanIterator::ExecSpanIterator(const std::vector<Datum>& args, int64_t length,
+                                   int64_t max_chunksize)
+    : args_(args), position_(0), length_(length), max_chunksize_(max_chunksize) {
+  chunk_indexes_.resize(args_.size(), 0);
+  value_positions_.resize(args_.size(), 0);
+  value_offsets_.resize(args_.size(), 0);
+}
+
+Result<std::unique_ptr<ExecSpanIterator>> ExecSpanIterator::Make(
+    const std::vector<Datum>& args, int64_t max_chunksize) {
+  int64_t length = 1;
+  RETURN_NOT_OK(GetBatchLength(args, &length));
+  max_chunksize = std::min(length, max_chunksize);
+  return std::unique_ptr<ExecSpanIterator>(
+      new ExecSpanIterator(args, length, max_chunksize));
+}
+
+int64_t ExecSpanIterator::GetNextChunkSpan(int64_t iteration_size, ExecSpan* span) {
+  for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
+    // If the argument is not a chunked array, it's either a Scalar or Array,
+    // in which case it doesn't influence the size of this span. Note that if
+    // the args are all scalars the span length is 1
+    if (!args_[i].is_chunked_array()) {
+      continue;
+    }
+    const ChunkedArray* arg = args_[i].chunked_array().get();
+    const Array* current_chunk;
+    while (true) {
+      current_chunk = arg->chunk(chunk_indexes_[i]).get();
+      if (value_positions_[i] == current_chunk->length()) {
+        // Chunk is zero-length, or was exhausted in the previous
+        // iteration. Move to the next chunk
+        ++chunk_indexes_[i];
+        current_chunk = arg->chunk(chunk_indexes_[i]).get();
+        span->values[i].SetArray(*current_chunk->data());
+        value_positions_[i] = 0;
+        value_offsets_[i] = current_chunk->offset();
+        continue;
+      }
+      break;
+    }
+    iteration_size =
+        std::min(current_chunk->length() - value_positions_[i], iteration_size);
+  }
+  return iteration_size;
+}
+
+bool ExecSpanIterator::Next(ExecSpan* span) {
+  if (position_ == length_) {
+    // This also protects from degenerate cases like ChunkedArrays
+    // without any chunks
+    return false;
+  }
+
+  if (!initialized_) {
+    span->length = 0;
+
+    // The first this this is called, we populate the output span with

Review Comment:
   nit typos: `first time`... `members updated during the iteration`



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -252,6 +254,192 @@ struct ARROW_EXPORT ExecBatch {
 inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals(r); }
 inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }
 
+struct ExecValue {

Review Comment:
   Big fan of this class.



##########
cpp/src/arrow/compute/exec.h:
##########
@@ -252,6 +254,192 @@ struct ARROW_EXPORT ExecBatch {
 inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals(r); }
 inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }
 
+struct ExecValue {
+  enum Kind { ARRAY, SCALAR };
+  Kind kind = ARRAY;
+  ArraySpan array;
+  const Scalar* scalar;
+
+  ExecValue(Scalar* scalar)  // NOLINT implicit conversion
+      : kind(SCALAR), scalar(scalar) {}
+
+  ExecValue(ArraySpan array)  // NOLINT implicit conversion
+      : kind(ARRAY), array(std::move(array)) {}
+
+  ExecValue(const ArrayData& array)  // NOLINT implicit conversion
+      : kind(ARRAY) {
+    this->array.SetMembers(array);
+  }
+
+  ExecValue() = default;
+  ExecValue(const ExecValue& other) = default;
+  ExecValue& operator=(const ExecValue& other) = default;
+  ExecValue(ExecValue&& other) = default;
+  ExecValue& operator=(ExecValue&& other) = default;
+
+  int64_t length() const { return this->is_array() ? this->array.length : 1; }
+
+  bool is_array() const { return this->kind == ARRAY; }
+  bool is_scalar() const { return this->kind == SCALAR; }
+
+  void SetArray(const ArrayData& array) {
+    this->kind = ARRAY;
+    this->array.SetMembers(array);
+  }
+
+  void SetScalar(const Scalar* scalar) {
+    this->kind = SCALAR;
+    this->scalar = scalar;
+  }
+
+  template <typename ExactType>
+  const ExactType& scalar_as() const {
+    return ::arrow::internal::checked_cast<const ExactType&>(*this->scalar);
+  }
+
+  /// XXX: here only temporarily until type resolution can be cleaned
+  /// up to not use ValueDescr
+  ValueDescr descr() const {
+    ValueDescr::Shape shape = this->is_array() ? ValueDescr::ARRAY : ValueDescr::SCALAR;
+    return ValueDescr(const_cast<DataType*>(this->type())->shared_from_this(), shape);
+  }
+
+  /// XXX: here temporarily for compatibility with datum, see
+  /// e.g. MakeStructExec in scalar_nested.cc
+  int64_t null_count() const {
+    if (this->is_array()) {
+      return this->array.GetNullCount();
+    } else {
+      return this->scalar->is_valid ? 0 : 1;
+    }
+  }
+
+  const DataType* type() const {
+    if (this->kind == ARRAY) {
+      return array.type;
+    } else {
+      return scalar->type.get();
+    }
+  }
+};
+
+struct ARROW_EXPORT ExecResult {
+  // The default value of the variant is ArraySpan
+  // TODO(wesm): remove Scalar output modality in ARROW-16577
+  util::Variant<ArraySpan, std::shared_ptr<ArrayData>, std::shared_ptr<Scalar>> value;
+
+  int64_t length() const {
+    if (this->is_array_span()) {
+      return this->array_span()->length;
+    } else if (this->is_array_data()) {
+      return this->array_data()->length;
+    } else {
+      // Should not reach here
+      return 1;
+    }
+  }
+
+  const DataType* type() const {
+    switch (this->value.index()) {
+      case 0:
+        return this->array_span()->type;
+      case 1:
+        return this->array_data()->type.get();
+      default:
+        // scalar
+        return this->scalar()->type.get();
+    };
+  }
+
+  ArraySpan* array_span() const {
+    return const_cast<ArraySpan*>(&util::get<ArraySpan>(this->value));
+  }
+  bool is_array_span() const { return this->value.index() == 0; }
+
+  const std::shared_ptr<ArrayData>& array_data() const {
+    return util::get<std::shared_ptr<ArrayData>>(this->value);
+  }
+
+  bool is_array_data() const { return this->value.index() == 1; }
+
+  const std::shared_ptr<Scalar>& scalar() const {
+    return util::get<std::shared_ptr<Scalar>>(this->value);
+  }
+
+  bool is_scalar() const { return this->value.index() == 2; }
+};
+
+/// \brief A "lightweight" column batch object which contains no
+/// std::shared_ptr objects and does not have any memory ownership
+/// semantics. Can represent a view onto an "owning" ExecBatch.
+struct ARROW_EXPORT ExecSpan {
+  ExecSpan() = default;
+  ExecSpan(const ExecSpan& other) = default;
+  ExecSpan& operator=(const ExecSpan& other) = default;
+  ExecSpan(ExecSpan&& other) = default;
+  ExecSpan& operator=(ExecSpan&& other) = default;
+
+  explicit ExecSpan(std::vector<ExecValue> values, int64_t length)
+      : length(length), values(std::move(values)) {}
+
+  explicit ExecSpan(const ExecBatch& batch) {
+    this->length = batch.length;
+    this->values.resize(batch.values.size());
+    for (size_t i = 0; i < batch.values.size(); ++i) {
+      const Datum& in_value = batch[i];
+      ExecValue* out_value = &this->values[i];
+      if (in_value.is_array()) {
+        out_value->SetArray(*in_value.array());
+      } else {
+        out_value->SetScalar(in_value.scalar().get());
+      }
+    }
+  }
+
+  bool is_all_scalar() const {
+    return std::all_of(this->values.begin(), this->values.end(),
+                       [](const ExecValue& v) { return v.is_scalar(); });
+  }
+
+  /// \brief Return the value at the i-th index
+  template <typename index_type>
+  inline const ExecValue& operator[](index_type i) const {
+    return values[i];
+  }
+
+  void AddOffset(int64_t offset) {
+    for (ExecValue& value : values) {
+      if (value.kind == ExecValue::ARRAY) {
+        value.array.AddOffset(offset);
+      }
+    }
+  }
+
+  void SetOffset(int64_t offset) {
+    for (ExecValue& value : values) {
+      if (value.kind == ExecValue::ARRAY) {
+        value.array.SetOffset(offset);
+      }
+    }
+  }
+
+  /// \brief A convenience for the number of values / arguments.
+  int num_values() const { return static_cast<int>(values.size()); }
+
+  // XXX: eliminate the need for ValueDescr; copied temporarily from
+  // ExecBatch
+  std::vector<ValueDescr> GetDescriptors() const {
+    std::vector<ValueDescr> result;
+    for (const auto& value : this->values) {
+      result.emplace_back(value.descr());
+    }
+    return result;
+  }
+
+  int64_t length;
+  std::vector<ExecValue> values;

Review Comment:
   I am a little sus about this std::vector. May be ok for now but eventually I'd like to avoid small heap allocations. When we add a bump allocator for this kind of stuff within Acero, I'd like to switch to that. 
   
   If we want to support that now, we can just make this ExecSpan take a pointer to ExecValue and the number of ExecValues, so that ExecSpan doesn't have to touch the heap at all either.



##########
cpp/src/arrow/array/data.h:
##########
@@ -242,6 +245,127 @@ struct ARROW_EXPORT ArrayData {
   std::shared_ptr<ArrayData> dictionary;
 };
 
+/// \brief A non-owning Buffer reference
+struct ARROW_EXPORT BufferRef {
+  // It is the user of this class's responsibility to ensure that
+  // buffers that were const originally are not written to
+  // accidentally.
+  uint8_t* data = NULLPTR;
+  int64_t size = 0;
+  // Pointer back to buffer that owns this memory
+  const std::shared_ptr<Buffer>* owner = NULLPTR;
+};
+
+/// \brief EXPERIMENTAL: A non-owning ArrayData reference that is cheaply
+/// copyable and does not contain any shared_ptr objects. Do not use in public
+/// APIs aside from compute kernels for now
+struct ARROW_EXPORT ArraySpan {
+  const DataType* type;
+  int64_t length = 0;
+  mutable int64_t null_count = kUnknownNullCount;
+  int64_t offset = 0;
+  BufferRef buffers[3];
+
+  ArraySpan() = default;
+
+  explicit ArraySpan(const DataType* type, int64_t length) : type(type), length(length) {}
+  explicit ArraySpan(const ArrayData& data) { SetMembers(data); }
+  explicit ArraySpan(const Scalar& data) { FillFromScalar(data); }
+
+  /// If dictionary-encoded, put dictionary in the first entry
+  // TODO(wesm): would a std::unique_ptr<vector<...>> be better?

Review Comment:
   I don't see why it would be, `unique_ptr<vector>` here would just amount to having a pointer to a pointer to an an array instead of just a pointer to an array.



##########
cpp/src/arrow/compute/exec_internal.h:
##########
@@ -62,16 +63,65 @@ class ARROW_EXPORT ExecBatchIterator {
   int64_t max_chunksize() const { return max_chunksize_; }
 
  private:
-  ExecBatchIterator(std::vector<Datum> args, int64_t length, int64_t max_chunksize);
+  ExecBatchIterator(const std::vector<Datum>& args, int64_t length,
+                    int64_t max_chunksize);
 
-  std::vector<Datum> args_;
+  const std::vector<Datum>& args_;
   std::vector<int> chunk_indexes_;
   std::vector<int64_t> chunk_positions_;
   int64_t position_;
   int64_t length_;
   int64_t max_chunksize_;
 };
 
+/// \brief Break std::vector<Datum> into a sequence of non-owning
+/// ExecSpan for kernel execution. The lifetime of the Datum vector
+/// must be longer than the lifetime of this object
+class ARROW_EXPORT ExecSpanIterator {
+ public:
+  /// \brief Construct iterator and do basic argument validation
+  ///
+  /// \param[in] args the Datum argument, must be all array-like or scalar
+  /// \param[in] max_chunksize the maximum length of each ExecSpan. Depending
+  /// on the chunk layout of ChunkedArray.
+  static Result<std::unique_ptr<ExecSpanIterator>> Make(

Review Comment:
   Why do we need unique_ptr here? I think for something like an iterator it would make more sense to avoid any heap allocations
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org