You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/13 05:50:50 UTC

[GitHub] [arrow] michalursa opened a new pull request, #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

michalursa opened a new pull request, #12872:
URL: https://github.com/apache/arrow/pull/12872

   Adding utilities that make it easier to filter, replicate and accumulate rows from potentially multiple sources in a single exec batch.
   This is meant to be used in hash join for reducing overheads in producing its output.
   
   Also, moving light-weight array utilities to the same file and adding helper functions for generating them.
   
   Thanks to Weston Pace for updating comments and writing the unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1097586004

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r852550757


##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+                                              int buffer_id_to_replace) const {
+  KeyColumnArray copy = *this;
+  copy.mutable_buffers_[buffer_id_to_replace] =
+      other.mutable_buffers_[buffer_id_to_replace];
+  copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+  if (buffer_id_to_replace < max_buffers_ - 1) {
+    copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace];
+  }
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const {
+  KeyColumnArray copy = *this;
+  copy.metadata_ = metadata;
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+  KeyColumnArray sliced;
+  sliced.metadata_ = metadata_;
+  sliced.length_ = length;
+  uint32_t fixed_size =
+      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+  sliced.buffers_[0] =
+      buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.mutable_buffers_[0] =
+      mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+  if (fixed_size == 0 && !metadata_.is_null_type) {
+    sliced.buffers_[1] =
+        buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+    sliced.mutable_buffers_[1] = mutable_buffers_[1]
+                                     ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8
+                                     : nullptr;
+    sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+  } else {
+    sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr;
+    sliced.mutable_buffers_[1] =
+        mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr;
+    sliced.bit_offset_[1] = 0;
+  }
+
+  sliced.buffers_[2] = buffers_[2];
+  sliced.mutable_buffers_[2] = mutable_buffers_[2];
+  return sliced;
+}
+
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type) {
+  if (type->id() == Type::DICTIONARY) {
+    auto bit_width =
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
+    ARROW_DCHECK(bit_width % 8 == 0);
+    return KeyColumnMetadata(true, bit_width / 8);
+  }
+  if (type->id() == Type::BOOL) {
+    return KeyColumnMetadata(true, 0);
+  }
+  if (is_fixed_width(type->id())) {
+    return KeyColumnMetadata(
+        true,
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
+  }
+  if (is_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint32_t));
+  }
+  if (is_large_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint64_t));
+  }
+  if (type->id() == Type::NA) {
+    return KeyColumnMetadata(true, 0, true);
+  }
+  // Should not reach this point, caller attempted to create a KeyColumnArray from an
+  // invalid type
+  ARROW_DCHECK(false);
+  return KeyColumnMetadata(true, sizeof(int));
+}
+
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows) {
+  KeyColumnArray column_array = KeyColumnArray(
+      ColumnMetadataFromDataType(array_data->type),
+      array_data->offset + start_row + num_rows,
+      array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
+      array_data->buffers[1]->data(),
+      (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+          ? array_data->buffers[2]->data()
+          : nullptr);
+  return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_metadatas->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_metadatas)[i] = ColumnMetadataFromDataType(array_data->type);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_arrays->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_arrays)[i] = ColumnArrayFromArrayData(array_data, start_row, num_rows);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length), column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+                              MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+  if (num_rows_allocated_ > 0) {
+    ARROW_DCHECK(data_type_ != NULLPTR);
+    KeyColumnMetadata metadata_before = ColumnMetadataFromDataType(data_type_);
+    KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type);
+    ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length &&
+                 metadata_before.fixed_length == metadata_after.fixed_length);
+  }
+#endif
+  Clear(/*release_buffers=*/false);
+  log_num_rows_min_ = log_num_rows_min;
+  data_type_ = data_type;
+  pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+  num_rows_ = 0;
+  if (release_buffers) {
+    non_null_buf_.reset();
+    fixed_len_buf_.reset();
+    var_len_buf_.reset();
+    num_rows_allocated_ = 0;
+    var_len_buf_size_ = 0;
+  }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+  ARROW_DCHECK(num_rows_new >= 0);
+  if (num_rows_new <= num_rows_allocated_) {
+    num_rows_ = num_rows_new;
+    return Status::OK();
+  }
+
+  int num_rows_allocated_new = 1 << log_num_rows_min_;
+  while (num_rows_allocated_new < num_rows_new) {
+    num_rows_allocated_new *= 2;
+  }
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (fixed_len_buf_ == NULLPTR) {
+    ARROW_DCHECK(non_null_buf_ == NULLPTR && var_len_buf_ == NULLPTR);
+
+    ARROW_ASSIGN_OR_RAISE(
+        non_null_buf_,
+        AllocateResizableBuffer(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_));
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes,
+                pool_));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes,
+                pool_));
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(
+          fixed_len_buf_,
+          AllocateResizableBuffer(
+              (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(var_len_buf_, AllocateResizableBuffer(
+                                            sizeof(uint64_t) + kNumPaddingBytes, pool_));
+
+    var_len_buf_size_ = sizeof(uint64_t);
+  } else {
+    ARROW_DCHECK(non_null_buf_ != NULLPTR && var_len_buf_ != NULLPTR);
+
+    RETURN_NOT_OK(non_null_buf_->Resize(bit_util::BytesForBits(num_rows_allocated_new) +
+                                        kNumPaddingBytes));
+
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+      } else {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes));
+      }
+    } else {
+      RETURN_NOT_OK(fixed_len_buf_->Resize(
+          (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+    }
+  }
+
+  num_rows_allocated_ = num_rows_allocated_new;
+  num_rows_ = num_rows_new;
+
+  return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (!column_metadata.is_fixed_length) {
+    int min_new_size = static_cast<int>(
+        reinterpret_cast<const uint32_t*>(fixed_len_buf_->data())[num_rows_]);
+    ARROW_DCHECK(var_len_buf_size_ > 0);
+    if (var_len_buf_size_ < min_new_size) {
+      int new_size = var_len_buf_size_;
+      while (new_size < min_new_size) {
+        new_size *= 2;
+      }
+      RETURN_NOT_OK(var_len_buf_->Resize(new_size + kNumPaddingBytes));
+      var_len_buf_size_ = new_size;
+    }
+  }
+
+  return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+  return KeyColumnArray(column_metadata, num_rows_, non_null_buf_->mutable_data(),
+                        fixed_len_buf_->mutable_data(), var_len_buf_->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  auto valid_count = arrow::internal::CountSetBits(non_null_buf_->data(), /*offset=*/0,
+                                                   static_cast<int64_t>(num_rows_));
+  int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+  if (column_metadata.is_fixed_length) {
+    return ArrayData::Make(data_type_, num_rows_, {non_null_buf_, fixed_len_buf_},
+                           null_count);
+  } else {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {non_null_buf_, fixed_len_buf_, var_len_buf_}, null_count);
+  }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+                                    int num_rows, const uint16_t* row_ids,
+                                    int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+  // Ids must be in non-decreasing order
+  //
+  for (int i = 1; i < num_rows; ++i) {
+    ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+  }
+#endif
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(column->type);
+
+  int num_rows_left = num_rows;
+  int num_bytes_skipped = 0;
+  while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        num_rows_left = std::max(num_rows_left, 8) - 8;
+        ++num_bytes_skipped;
+      } else {
+        --num_rows_left;
+        num_bytes_skipped += column_metadata.fixed_length;
+      }
+    } else {
+      --num_rows_left;
+      int row_id_removed = row_ids[num_rows_left];
+      const uint32_t* offsets =
+          reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+      num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
+    }
+  }
+
+  return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+                                      int64_t input_bits_offset, uint8_t* output_bits,
+                                      int64_t output_bits_offset, int num_rows,
+                                      const uint16_t* row_ids) {
+  if (!OUTPUT_BYTE_ALIGNED) {
+    ARROW_DCHECK(output_bits_offset % 8 > 0);
+    output_bits[output_bits_offset / 8] &=
+        static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+  } else {
+    ARROW_DCHECK(output_bits_offset % 8 == 0);
+  }
+  constexpr int unroll = 8;
+  for (int i = 0; i < num_rows / unroll; ++i) {
+    const uint16_t* row_ids_base = row_ids + unroll * i;
+    uint8_t result;
+    result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0;
+    if (OUTPUT_BYTE_ALIGNED) {
+      output_bits[output_bits_offset / 8 + i] = result;
+    } else {
+      output_bits[output_bits_offset / 8 + i] |=
+          static_cast<uint8_t>(result << (output_bits_offset % 8));
+      output_bits[output_bits_offset / 8 + i + 1] =
+          static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+    }
+  }
+  if (num_rows % unroll > 0) {
+    for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+      bit_util::SetBitTo(output_bits, output_bits_offset + i,
+                         bit_util::GetBit(input_bits, input_bits_offset + row_ids[i]));
+    }
+  }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+                                   uint8_t* output_bits, int64_t output_bits_offset,
+                                   int num_rows, const uint16_t* row_ids) {
+  if (output_bits_offset % 8 > 0) {
+    CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                          num_rows, row_ids);
+  } else {
+    CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                         num_rows, row_ids);
+  }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+                             const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type);
+
+  if (!metadata.is_fixed_length) {
+    const uint8_t* ptr_base = column->buffers[2]->data();
+    const uint32_t* offsets =
+        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr = ptr_base + offsets[row_id];
+      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      process_value_fn(i, field_ptr, field_length);
+    }
+  } else {
+    ARROW_DCHECK(metadata.fixed_length > 0);
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr =
+          column->buffers[1]->data() +
+          (column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
+      process_value_fn(i, field_ptr, metadata.fixed_length);
+    }
+  }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
+                                        ResizableArrayData* target,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        MemoryPool* pool) {
+  int num_rows_before = target->num_rows();
+  ARROW_DCHECK(num_rows_before >= 0);
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target->num_rows() == 0) {
+    target->Init(source->type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(source->type);
+
+  if (column_metadata.is_fixed_length) {
+    // Fixed length column
+    //
+    uint32_t fixed_length = column_metadata.fixed_length;
+    switch (fixed_length) {
+      case 0:
+        CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
+                    num_rows_before, num_rows_to_append, row_ids);
+        break;
+      case 1:
+        Visit(source, num_rows_to_append, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                target->mutable_data(1)[num_rows_before + i] = *ptr;
+              });
+        break;
+      case 2:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint16_t*>(ptr);
+            });
+        break;
+      case 4:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint32_t*>(ptr);
+            });
+        break;
+      case 8:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint64_t*>(ptr);
+            });
+        break;
+      default: {
+        int num_rows_to_process =
+            num_rows_to_append -
+            NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+        Visit(source, num_rows_to_process, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                uint64_t* dst = reinterpret_cast<uint64_t*>(
+                    target->mutable_data(1) +
+                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                for (uint32_t word_id = 0;
+                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+                     ++word_id) {
+                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+                }
+              });
+        if (num_rows_to_append > num_rows_to_process) {
+          Visit(source, num_rows_to_append - num_rows_to_process,
+                row_ids + num_rows_to_process,
+                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                  uint64_t* dst = reinterpret_cast<uint64_t*>(
+                      target->mutable_data(1) +
+                      static_cast<int64_t>(num_bytes) *
+                          (num_rows_before + num_rows_to_process + i));
+                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                  memcpy(dst, src, num_bytes);
+                });
+        }
+      }
+    }
+  } else {
+    // Varying length column
+    //
+
+    // Step 1: calculate target offsets
+    //
+    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    Visit(source, num_rows_to_append, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            offsets[num_rows_before + i] = num_bytes;
+          });
+    for (int i = 0; i < num_rows_to_append; ++i) {
+      uint32_t length = offsets[num_rows_before + i];
+      offsets[num_rows_before + i] = sum;
+      sum += length;
+    }
+    offsets[num_rows_before + num_rows_to_append] = sum;
+
+    // Step 2: resize output buffers
+    //
+    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+    // Step 3: copy varying-length data
+    //
+    int num_rows_to_process =
+        num_rows_to_append -
+        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+    Visit(source, num_rows_to_process, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+                                                        offsets[num_rows_before + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            for (uint32_t word_id = 0;
+                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
+              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+            }
+          });
+    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(
+                target->mutable_data(2) +
+                offsets[num_rows_before + num_rows_to_process + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            memcpy(dst, src, num_bytes);
+          });
+  }
+
+  // Process nulls
+  //
+  if (source->buffers[0] == NULLPTR) {
+    uint8_t* dst = target->mutable_data(0);
+    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
+    for (int i = num_rows_before / 8 + 1;
+         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
+      dst[i] = 0xff;
+    }
+  } else {
+    CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
+                num_rows_before, num_rows_to_append, row_ids);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+                                     ResizableArrayData& target, int num_rows_to_append,
+                                     MemoryPool* pool) {
+  int num_rows_before = target.num_rows();
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target.num_rows() == 0) {
+    target.Init(type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type);
+
+  // Process fixed length buffer
+  //
+  if (column_metadata.is_fixed_length) {
+    uint8_t* dst = target.mutable_data(1);
+    if (column_metadata.fixed_length == 0) {
+      dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+      int64_t offset_begin = num_rows_before / 8 + 1;
+      int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+      if (offset_end > offset_begin) {
+        memset(dst + offset_begin, 0, offset_end - offset_begin);
+      }
+    } else {
+      memset(dst + num_rows_before * static_cast<int64_t>(column_metadata.fixed_length),
+             0, static_cast<int64_t>(column_metadata.fixed_length) * num_rows_to_append);
+    }
+  } else {
+    uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+    for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+      dst[i] = sum;
+    }
+  }
+
+  // Process nulls
+  //
+  uint8_t* dst = target.mutable_data(0);
+  dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+  int64_t offset_begin = num_rows_before / 8 + 1;
+  int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+  if (offset_end > offset_begin) {
+    memset(dst + offset_begin, 0, offset_end - offset_begin);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int num_cols, const int* col_ids) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(num_cols);
+    for (int i = 0; i < num_cols; ++i) {
+      const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+      ARROW_DCHECK(data.is_array());
+      const std::shared_ptr<ArrayData>& array_data = data.array();
+      values_[i].Init(array_data->type, pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    RETURN_NOT_OK(
+        AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int* num_appended, int num_cols,
+                                        const int* col_ids) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendSelected(pool, batch, num_rows_next, row_ids, num_cols, col_ids));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(types.size());
+    for (size_t i = 0; i < types.size(); ++i) {
+      values_[i].Init(types[i], pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append, int* num_appended) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendNulls(pool, types, num_rows_next));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+  ARROW_DCHECK(num_rows() > 0);

Review Comment:
   This is ok. I also realized that an empty ExecBatch might not be possible if we haven't seen any data yet because we wouldn't know the types for the columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1107674168

   Benchmark runs are scheduled for baseline = 2e7acabf7ba9c4df8621918e82fe795a227a3b31 and contender = b0c75dee34de65834e5a83438e6581f90970fd3d. b0c75dee34de65834e5a83438e6581f90970fd3d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/16fe70ac5c264e499cce56c24b8e5ccd...a25a224b6dc44929ba364d9cb76ffeda/)
   [Failed] [test-mac-arm](https://conbench.ursa.dev/compare/runs/5ff4b178d30142488a895bb82ba01e39...36ea867286c44e72b2e48323d10b7890/)
   [Failed :arrow_down:0.0% :arrow_up:0.75%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/e9b14301584c4d9d9ec5bd7df77c46a0...bcfc5e707eb946b699058ee3855a2c53/)
   [Finished :arrow_down:0.25% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/6c0d2641c4894670857226310bd264c3...a7a2e952fb1c45e4b754addc884f9510/)
   Buildkite builds:
   [Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/571| `b0c75dee` ec2-t3-xlarge-us-east-2>
   [Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/559| `b0c75dee` test-mac-arm>
   [Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/557| `b0c75dee` ursa-i9-9960x>
   [Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/569| `b0c75dee` ursa-thinkcentre-m75q>
   [Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/570| `2e7acabf` ec2-t3-xlarge-us-east-2>
   [Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/558| `2e7acabf` test-mac-arm>
   [Failed] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/556| `2e7acabf` ursa-i9-9960x>
   [Finished] <https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/568| `2e7acabf` ursa-thinkcentre-m75q>
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r850787225


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {

Review Comment:
   Can we add constants like:
   
   ```
   KeyColumnArray::kValidityBuffer
   KeyColumnArray::kFixedSizeBuffer
   KeyColumnArray::kVariableLengthBuffer
   ```
   
   It might help for readability.  Or we could split this into three accessors.



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a fixed
+  ///           length data type) or offsets (if this is a variable binary type)
+  /// If i is 2 then this returns the buffer used for variable length binary data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;
+};
+
+/// \brief A builder to concatenate batches of data into a larger batch
+///
+/// Will only store num_rows_max() rows
+class ExecBatchBuilder {
+ public:
+  /// \brief Add rows from `source` into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
+                               ResizableArrayData* target, int num_rows_to_append,
+                               const uint16_t* row_ids, MemoryPool* pool);
+
+  /// \brief Add nulls into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendNulls(const std::shared_ptr<DataType>& type,
+                            ResizableArrayData& target, int num_rows_to_append,
+                            MemoryPool* pool);
+
+  /// \brief Add selected rows from `batch`
+  ///
+  /// If `col_ids` is null then `num_cols` should less than batch.num_values() and
+  /// the first `num_cols` columns of batch will be appended.
+  ///
+  /// All columns in `batch` must have array shape
+  Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append,
+                        const uint16_t* row_ids, int num_cols,
+                        const int* col_ids = NULLPTR);

Review Comment:
   It's a little confusing that we have overloads that store `num_appended` and overloads that do not store `num_appended`.  The former are also able to exceed `num_rows_max()` I think.  What is the intended use case for the overloads that don't store `num_appended`?



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a fixed
+  ///           length data type) or offsets (if this is a variable binary type)
+  /// If i is 2 then this returns the buffer used for variable length binary data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;

Review Comment:
   Here we have three explicitly named buffers and for KeyColumnArray we have a fixed-size array of buffers.  We should probably align on a single approach.  Do you have a preference between the two or a reason for them to be different?



##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+                                              int buffer_id_to_replace) const {
+  KeyColumnArray copy = *this;
+  copy.mutable_buffers_[buffer_id_to_replace] =
+      other.mutable_buffers_[buffer_id_to_replace];
+  copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+  if (buffer_id_to_replace < max_buffers_ - 1) {
+    copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace];
+  }
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const {
+  KeyColumnArray copy = *this;
+  copy.metadata_ = metadata;
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+  KeyColumnArray sliced;
+  sliced.metadata_ = metadata_;
+  sliced.length_ = length;
+  uint32_t fixed_size =
+      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+  sliced.buffers_[0] =
+      buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.mutable_buffers_[0] =
+      mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+  if (fixed_size == 0 && !metadata_.is_null_type) {
+    sliced.buffers_[1] =
+        buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+    sliced.mutable_buffers_[1] = mutable_buffers_[1]
+                                     ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8
+                                     : nullptr;
+    sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+  } else {
+    sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr;
+    sliced.mutable_buffers_[1] =
+        mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr;
+    sliced.bit_offset_[1] = 0;
+  }
+
+  sliced.buffers_[2] = buffers_[2];
+  sliced.mutable_buffers_[2] = mutable_buffers_[2];
+  return sliced;
+}
+
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type) {
+  if (type->id() == Type::DICTIONARY) {
+    auto bit_width =
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
+    ARROW_DCHECK(bit_width % 8 == 0);
+    return KeyColumnMetadata(true, bit_width / 8);
+  }
+  if (type->id() == Type::BOOL) {
+    return KeyColumnMetadata(true, 0);
+  }
+  if (is_fixed_width(type->id())) {
+    return KeyColumnMetadata(
+        true,
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
+  }
+  if (is_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint32_t));
+  }
+  if (is_large_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint64_t));
+  }
+  if (type->id() == Type::NA) {
+    return KeyColumnMetadata(true, 0, true);
+  }
+  // Should not reach this point, caller attempted to create a KeyColumnArray from an
+  // invalid type
+  ARROW_DCHECK(false);
+  return KeyColumnMetadata(true, sizeof(int));
+}
+
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows) {
+  KeyColumnArray column_array = KeyColumnArray(
+      ColumnMetadataFromDataType(array_data->type),
+      array_data->offset + start_row + num_rows,
+      array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
+      array_data->buffers[1]->data(),
+      (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+          ? array_data->buffers[2]->data()
+          : nullptr);
+  return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_metadatas->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_metadatas)[i] = ColumnMetadataFromDataType(array_data->type);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_arrays->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_arrays)[i] = ColumnArrayFromArrayData(array_data, start_row, num_rows);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length), column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+                              MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+  if (num_rows_allocated_ > 0) {
+    ARROW_DCHECK(data_type_ != NULLPTR);
+    KeyColumnMetadata metadata_before = ColumnMetadataFromDataType(data_type_);
+    KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type);
+    ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length &&
+                 metadata_before.fixed_length == metadata_after.fixed_length);
+  }
+#endif
+  Clear(/*release_buffers=*/false);
+  log_num_rows_min_ = log_num_rows_min;
+  data_type_ = data_type;
+  pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+  num_rows_ = 0;
+  if (release_buffers) {
+    non_null_buf_.reset();
+    fixed_len_buf_.reset();
+    var_len_buf_.reset();
+    num_rows_allocated_ = 0;
+    var_len_buf_size_ = 0;
+  }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+  ARROW_DCHECK(num_rows_new >= 0);
+  if (num_rows_new <= num_rows_allocated_) {
+    num_rows_ = num_rows_new;
+    return Status::OK();
+  }
+
+  int num_rows_allocated_new = 1 << log_num_rows_min_;
+  while (num_rows_allocated_new < num_rows_new) {
+    num_rows_allocated_new *= 2;
+  }
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (fixed_len_buf_ == NULLPTR) {
+    ARROW_DCHECK(non_null_buf_ == NULLPTR && var_len_buf_ == NULLPTR);
+
+    ARROW_ASSIGN_OR_RAISE(
+        non_null_buf_,
+        AllocateResizableBuffer(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_));
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes,
+                pool_));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes,
+                pool_));
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(
+          fixed_len_buf_,
+          AllocateResizableBuffer(
+              (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(var_len_buf_, AllocateResizableBuffer(
+                                            sizeof(uint64_t) + kNumPaddingBytes, pool_));
+
+    var_len_buf_size_ = sizeof(uint64_t);
+  } else {
+    ARROW_DCHECK(non_null_buf_ != NULLPTR && var_len_buf_ != NULLPTR);
+
+    RETURN_NOT_OK(non_null_buf_->Resize(bit_util::BytesForBits(num_rows_allocated_new) +
+                                        kNumPaddingBytes));
+
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+      } else {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes));
+      }
+    } else {
+      RETURN_NOT_OK(fixed_len_buf_->Resize(
+          (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+    }
+  }
+
+  num_rows_allocated_ = num_rows_allocated_new;
+  num_rows_ = num_rows_new;
+
+  return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (!column_metadata.is_fixed_length) {
+    int min_new_size = static_cast<int>(
+        reinterpret_cast<const uint32_t*>(fixed_len_buf_->data())[num_rows_]);
+    ARROW_DCHECK(var_len_buf_size_ > 0);
+    if (var_len_buf_size_ < min_new_size) {
+      int new_size = var_len_buf_size_;
+      while (new_size < min_new_size) {
+        new_size *= 2;
+      }
+      RETURN_NOT_OK(var_len_buf_->Resize(new_size + kNumPaddingBytes));
+      var_len_buf_size_ = new_size;
+    }
+  }
+
+  return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+  return KeyColumnArray(column_metadata, num_rows_, non_null_buf_->mutable_data(),
+                        fixed_len_buf_->mutable_data(), var_len_buf_->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  auto valid_count = arrow::internal::CountSetBits(non_null_buf_->data(), /*offset=*/0,
+                                                   static_cast<int64_t>(num_rows_));
+  int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+  if (column_metadata.is_fixed_length) {
+    return ArrayData::Make(data_type_, num_rows_, {non_null_buf_, fixed_len_buf_},
+                           null_count);
+  } else {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {non_null_buf_, fixed_len_buf_, var_len_buf_}, null_count);
+  }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+                                    int num_rows, const uint16_t* row_ids,
+                                    int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+  // Ids must be in non-decreasing order
+  //
+  for (int i = 1; i < num_rows; ++i) {
+    ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+  }
+#endif
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(column->type);
+
+  int num_rows_left = num_rows;
+  int num_bytes_skipped = 0;
+  while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        num_rows_left = std::max(num_rows_left, 8) - 8;
+        ++num_bytes_skipped;
+      } else {
+        --num_rows_left;
+        num_bytes_skipped += column_metadata.fixed_length;
+      }
+    } else {
+      --num_rows_left;
+      int row_id_removed = row_ids[num_rows_left];
+      const uint32_t* offsets =
+          reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+      num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
+    }
+  }
+
+  return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+                                      int64_t input_bits_offset, uint8_t* output_bits,
+                                      int64_t output_bits_offset, int num_rows,
+                                      const uint16_t* row_ids) {
+  if (!OUTPUT_BYTE_ALIGNED) {
+    ARROW_DCHECK(output_bits_offset % 8 > 0);
+    output_bits[output_bits_offset / 8] &=
+        static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+  } else {
+    ARROW_DCHECK(output_bits_offset % 8 == 0);
+  }
+  constexpr int unroll = 8;
+  for (int i = 0; i < num_rows / unroll; ++i) {
+    const uint16_t* row_ids_base = row_ids + unroll * i;
+    uint8_t result;
+    result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0;
+    if (OUTPUT_BYTE_ALIGNED) {
+      output_bits[output_bits_offset / 8 + i] = result;
+    } else {
+      output_bits[output_bits_offset / 8 + i] |=
+          static_cast<uint8_t>(result << (output_bits_offset % 8));
+      output_bits[output_bits_offset / 8 + i + 1] =
+          static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+    }
+  }
+  if (num_rows % unroll > 0) {
+    for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+      bit_util::SetBitTo(output_bits, output_bits_offset + i,
+                         bit_util::GetBit(input_bits, input_bits_offset + row_ids[i]));
+    }
+  }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+                                   uint8_t* output_bits, int64_t output_bits_offset,
+                                   int num_rows, const uint16_t* row_ids) {
+  if (output_bits_offset % 8 > 0) {
+    CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                          num_rows, row_ids);
+  } else {
+    CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                         num_rows, row_ids);
+  }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+                             const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type);
+
+  if (!metadata.is_fixed_length) {
+    const uint8_t* ptr_base = column->buffers[2]->data();
+    const uint32_t* offsets =
+        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr = ptr_base + offsets[row_id];
+      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      process_value_fn(i, field_ptr, field_length);
+    }
+  } else {
+    ARROW_DCHECK(metadata.fixed_length > 0);
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr =
+          column->buffers[1]->data() +
+          (column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
+      process_value_fn(i, field_ptr, metadata.fixed_length);
+    }
+  }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
+                                        ResizableArrayData* target,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        MemoryPool* pool) {
+  int num_rows_before = target->num_rows();
+  ARROW_DCHECK(num_rows_before >= 0);
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target->num_rows() == 0) {
+    target->Init(source->type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(source->type);
+
+  if (column_metadata.is_fixed_length) {
+    // Fixed length column
+    //
+    uint32_t fixed_length = column_metadata.fixed_length;
+    switch (fixed_length) {
+      case 0:
+        CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
+                    num_rows_before, num_rows_to_append, row_ids);
+        break;
+      case 1:
+        Visit(source, num_rows_to_append, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                target->mutable_data(1)[num_rows_before + i] = *ptr;
+              });
+        break;
+      case 2:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint16_t*>(ptr);
+            });
+        break;
+      case 4:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint32_t*>(ptr);
+            });
+        break;
+      case 8:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint64_t*>(ptr);
+            });
+        break;
+      default: {
+        int num_rows_to_process =
+            num_rows_to_append -
+            NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+        Visit(source, num_rows_to_process, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                uint64_t* dst = reinterpret_cast<uint64_t*>(
+                    target->mutable_data(1) +
+                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                for (uint32_t word_id = 0;
+                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+                     ++word_id) {
+                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+                }
+              });
+        if (num_rows_to_append > num_rows_to_process) {
+          Visit(source, num_rows_to_append - num_rows_to_process,
+                row_ids + num_rows_to_process,
+                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                  uint64_t* dst = reinterpret_cast<uint64_t*>(
+                      target->mutable_data(1) +
+                      static_cast<int64_t>(num_bytes) *
+                          (num_rows_before + num_rows_to_process + i));
+                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                  memcpy(dst, src, num_bytes);
+                });
+        }
+      }
+    }
+  } else {
+    // Varying length column
+    //
+
+    // Step 1: calculate target offsets
+    //
+    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    Visit(source, num_rows_to_append, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            offsets[num_rows_before + i] = num_bytes;
+          });
+    for (int i = 0; i < num_rows_to_append; ++i) {
+      uint32_t length = offsets[num_rows_before + i];
+      offsets[num_rows_before + i] = sum;
+      sum += length;
+    }
+    offsets[num_rows_before + num_rows_to_append] = sum;
+
+    // Step 2: resize output buffers
+    //
+    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+    // Step 3: copy varying-length data
+    //
+    int num_rows_to_process =
+        num_rows_to_append -
+        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+    Visit(source, num_rows_to_process, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+                                                        offsets[num_rows_before + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            for (uint32_t word_id = 0;
+                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
+              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+            }
+          });
+    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(
+                target->mutable_data(2) +
+                offsets[num_rows_before + num_rows_to_process + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            memcpy(dst, src, num_bytes);
+          });
+  }
+
+  // Process nulls
+  //
+  if (source->buffers[0] == NULLPTR) {
+    uint8_t* dst = target->mutable_data(0);
+    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
+    for (int i = num_rows_before / 8 + 1;
+         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
+      dst[i] = 0xff;
+    }
+  } else {
+    CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
+                num_rows_before, num_rows_to_append, row_ids);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+                                     ResizableArrayData& target, int num_rows_to_append,
+                                     MemoryPool* pool) {
+  int num_rows_before = target.num_rows();
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target.num_rows() == 0) {
+    target.Init(type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type);
+
+  // Process fixed length buffer
+  //
+  if (column_metadata.is_fixed_length) {
+    uint8_t* dst = target.mutable_data(1);
+    if (column_metadata.fixed_length == 0) {
+      dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+      int64_t offset_begin = num_rows_before / 8 + 1;
+      int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+      if (offset_end > offset_begin) {
+        memset(dst + offset_begin, 0, offset_end - offset_begin);
+      }
+    } else {
+      memset(dst + num_rows_before * static_cast<int64_t>(column_metadata.fixed_length),
+             0, static_cast<int64_t>(column_metadata.fixed_length) * num_rows_to_append);
+    }
+  } else {
+    uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+    for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+      dst[i] = sum;
+    }
+  }
+
+  // Process nulls
+  //
+  uint8_t* dst = target.mutable_data(0);
+  dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+  int64_t offset_begin = num_rows_before / 8 + 1;
+  int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+  if (offset_end > offset_begin) {
+    memset(dst + offset_begin, 0, offset_end - offset_begin);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int num_cols, const int* col_ids) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(num_cols);
+    for (int i = 0; i < num_cols; ++i) {
+      const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+      ARROW_DCHECK(data.is_array());
+      const std::shared_ptr<ArrayData>& array_data = data.array();
+      values_[i].Init(array_data->type, pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    RETURN_NOT_OK(
+        AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int* num_appended, int num_cols,
+                                        const int* col_ids) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendSelected(pool, batch, num_rows_next, row_ids, num_cols, col_ids));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(types.size());
+    for (size_t i = 0; i < types.size(); ++i) {
+      values_[i].Init(types[i], pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append, int* num_appended) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendNulls(pool, types, num_rows_next));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+  ARROW_DCHECK(num_rows() > 0);

Review Comment:
   This feels like it might be a difficult assertion for the caller to maintain.  What is the downside of returning an empty `ExecBatch` in this situation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851066765


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.

Review Comment:
   Understood. At the very least it helps show what a replacement might look like indeed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851100413


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {

Review Comment:
   I added the constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851134165


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)

Review Comment:
   I changed it to return Result<>. 
   
   In hash join (which will be the first consumer of this interface) data type checks are done during initialization of ExecNode, so that introduces redundant checks of status.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r852554656


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+  // Constants used for accessing buffers using data() and mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));

Review Comment:
   ```suggestion
       return reinterpret_cast<const uint32_t*>(data(kFixedLengthBuffer));
   ```



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+  // Constants used for accessing buffers using data() and mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));

Review Comment:
   ```suggestion
       return reinterpret_cast<uint32_t*>(mutable_data(kFixedLengthBuffer));
   ```



##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}

Review Comment:
   ```suggestion
   KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
                                  const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer,
                                  const uint8_t* buffer2, int bit_offset_validity, int bit_offset1) {
     static_assert(std::is_pod<KeyColumnArray>::value,
                   "This class was intended to be a POD type");
     metadata_ = metadata;
     length_ = length;
     buffers_[kValidityBuffer] = validity_buffer;
     buffers_[kFixedLengthBuffer] = fixed_length_buffer;
     buffers_[kVariableLengthBuffer] = var_length_buffer;
     mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
     bit_offset_[kValidityBuffer] = bit_offset_validity;
     bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
   }
   ```



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);

Review Comment:
   ```suggestion
     KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* validity_buffer,
                    uint8_t* fixed_length_buffer, uint8_t* var_length_buffer, int bit_offset_validity = 0,
                    int bit_offset_fixed = 0);
   ```



##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}

Review Comment:
   ```suggestion
   KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
                                  uint8_t* validity_buffer, uint8_t* fixed_length_buffer, uint8_t* var_length_buffer,
                                  int bit_offset_validity, int bit_offset_fixed) {
     metadata_ = metadata;
     length_ = length;
     buffers_[kValidityBuffer] = mutable_buffers_[kValidityBuffer] = validity_buffer;
     buffers_[kFixedLengthBuffer] = mutable_buffers_[kFixedLengthBuffer] = fixed_length_buffer;
     buffers_[kVariableLengthBuffer] = mutable_buffers_[kVariableLengthBuffer] = var_length_buffer;
     bit_offset_[kValidityBuffer] = bit_offset_validity;
     bit_offset_[kFixedLengthBuffer] = bit_offset_fixed;
   }
   ```



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);

Review Comment:
   ```suggestion
     KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
                    const uint8_t* validity_buffer, const uint8_t* fixed_length_buffer, const uint8_t* var_length_buffer,
                    int bit_offset_validity = 0, int bit_offset_fixed = 0);
   ```
   
   These changes might be slightly tedious (need to run lint if you accept my changes as batch) but since this is part of the API it will help with auto-complete and readibility where we do things like `/*validity_buffer=*/`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851066060


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.

Review Comment:
   Yes, this isn't intended to be a wholesale replacement of ExecBatch, rather something that is small enough to get a PR through improving the hash-join performance.  Once that is done we can either grow these tools to support more types, replace them with something more complete but more efficient, or accept multiple variations of kernels/nodes depending on benchmarks and need.
   
   I think, by having these tools here, it will be easier to evaluate and work on potential replacements for ExecBatch, even if someone doesn't know the ins and outs of the join algorithm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] save-buffer commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
save-buffer commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r850736637


##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};
+
+TEST(KeyColumnMetadata, FromDataType) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean());
+  ASSERT_EQ(0, metadata.fixed_length);
+  ASSERT_EQ(true, metadata.is_fixed_length);
+  ASSERT_EQ(false, metadata.is_null_type);
+
+  metadata = ColumnMetadataFromDataType(null());
+  ASSERT_EQ(true, metadata.is_null_type);
+
+  for (const auto& type : kSampleFixedDataTypes) {
+    int byte_width =
+        internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(byte_width, metadata.fixed_length);
+    ASSERT_EQ(true, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {binary(), utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(4, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {large_binary(), large_utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(8, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+    // source array) while `offset` is the offset we pass when constructing the
+    // KeyColumnArray
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array;
+        int byte_width =
+            internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+        if (is_decimal(type->id())) {
+          array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+        } else {
+          array = ArrayFromJSON(type, "[1, 2, null]");
+        }
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * byte_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        // Maximum tested offset is < 8 so validity is just bit offset
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(nullptr, kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+        int offsets_width =
+            static_cast<int>(internal::checked_pointer_cast<BaseBinaryType>(type)
+                                 ->layout()
+                                 .buffers[1]
+                                 .byte_width);
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+  for (auto array_offset : {0, 1}) {
+    ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+    for (auto offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+      std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+      array = array->Slice(array_offset);
+      int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+      KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+      ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+      ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+      ASSERT_EQ(length, kc_array.length());
+      ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+      ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+    }
+  }
+}
+
+// TEST(KeyColumnArray, FromArrayDataNull) {

Review Comment:
   remove or uncomment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851066303


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.

Review Comment:
   The KeyColumnMetadata and KeyColumnArray classes are used in hash group by and in new hash join for interaction with the hash table. The reason to have them was to be able to work on smaller batches than ExecBatch and therefore to keep intermediate results of multi-step computations in L1 cache. The goal was to get rid of the overheads of memory allocations and shared pointers (and atomic instructions for ref counting) when doing things like slicing.
   
   If we make ExecBatch more lightweight in the future, it is likely that new ExecBatch structures would be good for hash tables. But the KeyColumn... classes so far restrict the functionality to minimum needed for hash table keys in terms of data type support and abstraction of the data (e.g. hash table is interested in data movement and interpretation of the bytes in column values is left to callback functions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851102132


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a fixed
+  ///           length data type) or offsets (if this is a variable binary type)
+  /// If i is 2 then this returns the buffer used for variable length binary data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;
+};
+
+/// \brief A builder to concatenate batches of data into a larger batch
+///
+/// Will only store num_rows_max() rows
+class ExecBatchBuilder {
+ public:
+  /// \brief Add rows from `source` into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendSelected(const std::shared_ptr<ArrayData>& source,
+                               ResizableArrayData* target, int num_rows_to_append,
+                               const uint16_t* row_ids, MemoryPool* pool);
+
+  /// \brief Add nulls into `target` column
+  ///
+  /// If `target` is uninitialized or cleared it will be initialized to use
+  /// the given pool.
+  static Status AppendNulls(const std::shared_ptr<DataType>& type,
+                            ResizableArrayData& target, int num_rows_to_append,
+                            MemoryPool* pool);
+
+  /// \brief Add selected rows from `batch`
+  ///
+  /// If `col_ids` is null then `num_cols` should less than batch.num_values() and
+  /// the first `num_cols` columns of batch will be appended.
+  ///
+  /// All columns in `batch` must have array shape
+  Status AppendSelected(MemoryPool* pool, const ExecBatch& batch, int num_rows_to_append,
+                        const uint16_t* row_ids, int num_cols,
+                        const int* col_ids = NULLPTR);

Review Comment:
   AppendSelected and AppendNulls variants that returned num_appended were wrappers on top of versions that did not. They can be removed as long as the caller is willing to make sure they don't exceed the batch size limit (there isn't anything incorrect about exceeding the limit at this point, but consumers of exec batches often assume that the rows can be indexed using 16-bit integers).
   
   I removed variants that returned num_appended (they were not used in new hash join) and added returning error status in case of exceeding limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851077345


##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};

Review Comment:
   There is no support for these data types yet (but there probably will be in the future). 
   I can remove the commented out part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1099882476

   > Looks good to me, I guess I already reviewed a bunch of this code on the other PR. One thing: is it worth to add `Hashing32::HashBatch` and `Hashing64::HashBatch` in this PR? I need those for Bloom Filter pushdown, not sure if those fit better here or there.
   
   Hashing seems unrelated to the topic of this PR, but we could possibly make a tiny separate PR just for HashBatch (or put it into Bloom filter pushdown).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851090256


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type);
+
+/// \brief Create KeyColumnArray from ArrayData
+///
+/// If `type` is a dictionary type then this will return the KeyColumnArray for
+/// the indices array
+///
+/// The caller should ensure this is only called on "key" columns.
+/// \see ColumnMetadataFromDataType for details
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows);
+
+/// \brief Create KeyColumnMetadata instances from an ExecBatch
+///
+/// column_metadatas will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnMetadataFromDataType for more details
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas);
+
+/// \brief Create KeyColumnArray instances from a slice of an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// \brief Create KeyColumnArray instances from an ExecBatch
+///
+/// column_arrays will be resized to fit
+///
+/// All columns in `batch` must be eligible "key" columns and have an array shape
+/// \see ColumnArrayFromArrayData for more details
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays);
+
+/// A lightweight resizable array for "key" columns
+///
+/// Unlike KeyColumnArray this instance owns its buffers
+///
+/// Resizing is handled by arrow::ResizableBuffer and a doubling approach is
+/// used so that resizes will always grow up to the next power of 2
+class ResizableArrayData {
+ public:
+  /// \brief Create an uninitialized instance
+  ///
+  /// Init must be called before calling any other operations
+  ResizableArrayData()
+      : log_num_rows_min_(0),
+        pool_(NULLPTR),
+        num_rows_(0),
+        num_rows_allocated_(0),
+        var_len_buf_size_(0) {}
+  ~ResizableArrayData() { Clear(true); }
+  /// \brief Initialize the array
+  /// \param data_type The data type this array is holding data for.
+  /// \param pool The pool to make allocations on
+  /// \param log_num_rows_min All resize operations will allocate at least enough
+  ///                         space for (1 << log_num_rows_min) rows
+  void Init(const std::shared_ptr<DataType>& data_type, MemoryPool* pool,
+            int log_num_rows_min);
+  /// \brief Resets the array back to an empty state
+  /// \param release_buffers If true then allocated memory is released and the
+  ///                        next resize operation will have to reallocate memory
+  void Clear(bool release_buffers);
+  /// \brief Resize the fixed length buffers
+  ///
+  /// The buffers will be resized to hold at least `num_rows_new` rows of data
+  Status ResizeFixedLengthBuffers(int num_rows_new);
+  /// \brief Resize the varying length buffer if this array is a variable binary type
+  ///
+  /// This must be called after offsets have been populated and the buffer will be
+  /// resized to hold at least as much data as the offsets require
+  ///
+  /// Does nothing if the array is not a variable binary type
+  Status ResizeVaryingLengthBuffer();
+  /// \brief The current length (in rows) of the array
+  int num_rows() const { return num_rows_; }
+  /// \brief A non-owning view into this array
+  KeyColumnArray column_array() const;
+  /// \brief A lightweight descriptor of the data held by this array
+  KeyColumnMetadata column_metadata() const {
+    return ColumnMetadataFromDataType(data_type_);
+  }
+  /// \brief Convert the data to an arrow::ArrayData
+  ///
+  /// This is a zero copy operation and the created ArrayData will reference the
+  /// buffers held by this instance.
+  std::shared_ptr<ArrayData> array_data() const;
+  /// \brief A raw pointer to the requested buffer
+  ///
+  /// If i is 0 then this returns the validity buffer
+  /// If i is 1 then this returns the buffer used for values (if this is a fixed
+  ///           length data type) or offsets (if this is a variable binary type)
+  /// If i is 2 then this returns the buffer used for variable length binary data
+  uint8_t* mutable_data(int i) {
+    return i == 0   ? non_null_buf_->mutable_data()
+           : i == 1 ? fixed_len_buf_->mutable_data()
+                    : var_len_buf_->mutable_data();
+  }
+
+ private:
+  static constexpr int64_t kNumPaddingBytes = 64;
+  int log_num_rows_min_;
+  std::shared_ptr<DataType> data_type_;
+  MemoryPool* pool_;
+  int num_rows_;
+  int num_rows_allocated_;
+  int var_len_buf_size_;
+  std::shared_ptr<ResizableBuffer> non_null_buf_;
+  std::shared_ptr<ResizableBuffer> fixed_len_buf_;
+  std::shared_ptr<ResizableBuffer> var_len_buf_;

Review Comment:
   Both are fine. I can change this one to an array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851085686


##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray KeyColumnArray::WithBufferFrom(const KeyColumnArray& other,
+                                              int buffer_id_to_replace) const {
+  KeyColumnArray copy = *this;
+  copy.mutable_buffers_[buffer_id_to_replace] =
+      other.mutable_buffers_[buffer_id_to_replace];
+  copy.buffers_[buffer_id_to_replace] = other.buffers_[buffer_id_to_replace];
+  if (buffer_id_to_replace < max_buffers_ - 1) {
+    copy.bit_offset_[buffer_id_to_replace] = other.bit_offset_[buffer_id_to_replace];
+  }
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::WithMetadata(const KeyColumnMetadata& metadata) const {
+  KeyColumnArray copy = *this;
+  copy.metadata_ = metadata;
+  return copy;
+}
+
+KeyColumnArray KeyColumnArray::Slice(int64_t offset, int64_t length) const {
+  KeyColumnArray sliced;
+  sliced.metadata_ = metadata_;
+  sliced.length_ = length;
+  uint32_t fixed_size =
+      !metadata_.is_fixed_length ? sizeof(uint32_t) : metadata_.fixed_length;
+
+  sliced.buffers_[0] =
+      buffers_[0] ? buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.mutable_buffers_[0] =
+      mutable_buffers_[0] ? mutable_buffers_[0] + (bit_offset_[0] + offset) / 8 : nullptr;
+  sliced.bit_offset_[0] = (bit_offset_[0] + offset) % 8;
+
+  if (fixed_size == 0 && !metadata_.is_null_type) {
+    sliced.buffers_[1] =
+        buffers_[1] ? buffers_[1] + (bit_offset_[1] + offset) / 8 : nullptr;
+    sliced.mutable_buffers_[1] = mutable_buffers_[1]
+                                     ? mutable_buffers_[1] + (bit_offset_[1] + offset) / 8
+                                     : nullptr;
+    sliced.bit_offset_[1] = (bit_offset_[1] + offset) % 8;
+  } else {
+    sliced.buffers_[1] = buffers_[1] ? buffers_[1] + offset * fixed_size : nullptr;
+    sliced.mutable_buffers_[1] =
+        mutable_buffers_[1] ? mutable_buffers_[1] + offset * fixed_size : nullptr;
+    sliced.bit_offset_[1] = 0;
+  }
+
+  sliced.buffers_[2] = buffers_[2];
+  sliced.mutable_buffers_[2] = mutable_buffers_[2];
+  return sliced;
+}
+
+KeyColumnMetadata ColumnMetadataFromDataType(const std::shared_ptr<DataType>& type) {
+  if (type->id() == Type::DICTIONARY) {
+    auto bit_width =
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width();
+    ARROW_DCHECK(bit_width % 8 == 0);
+    return KeyColumnMetadata(true, bit_width / 8);
+  }
+  if (type->id() == Type::BOOL) {
+    return KeyColumnMetadata(true, 0);
+  }
+  if (is_fixed_width(type->id())) {
+    return KeyColumnMetadata(
+        true,
+        arrow::internal::checked_cast<const FixedWidthType&>(*type).bit_width() / 8);
+  }
+  if (is_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint32_t));
+  }
+  if (is_large_binary_like(type->id())) {
+    return KeyColumnMetadata(false, sizeof(uint64_t));
+  }
+  if (type->id() == Type::NA) {
+    return KeyColumnMetadata(true, 0, true);
+  }
+  // Should not reach this point, caller attempted to create a KeyColumnArray from an
+  // invalid type
+  ARROW_DCHECK(false);
+  return KeyColumnMetadata(true, sizeof(int));
+}
+
+KeyColumnArray ColumnArrayFromArrayData(const std::shared_ptr<ArrayData>& array_data,
+                                        int start_row, int num_rows) {
+  KeyColumnArray column_array = KeyColumnArray(
+      ColumnMetadataFromDataType(array_data->type),
+      array_data->offset + start_row + num_rows,
+      array_data->buffers[0] != NULLPTR ? array_data->buffers[0]->data() : nullptr,
+      array_data->buffers[1]->data(),
+      (array_data->buffers.size() > 2 && array_data->buffers[2] != NULLPTR)
+          ? array_data->buffers[2]->data()
+          : nullptr);
+  return column_array.Slice(array_data->offset + start_row, num_rows);
+}
+
+void ColumnMetadatasFromExecBatch(const ExecBatch& batch,
+                                  std::vector<KeyColumnMetadata>* column_metadatas) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_metadatas->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_metadatas)[i] = ColumnMetadataFromDataType(array_data->type);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch, int start_row, int num_rows,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  int num_columns = static_cast<int>(batch.values.size());
+  column_arrays->resize(num_columns);
+  for (int i = 0; i < num_columns; ++i) {
+    const Datum& data = batch.values[i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    (*column_arrays)[i] = ColumnArrayFromArrayData(array_data, start_row, num_rows);
+  }
+}
+
+void ColumnArraysFromExecBatch(const ExecBatch& batch,
+                               std::vector<KeyColumnArray>* column_arrays) {
+  ColumnArraysFromExecBatch(batch, 0, static_cast<int>(batch.length), column_arrays);
+}
+
+void ResizableArrayData::Init(const std::shared_ptr<DataType>& data_type,
+                              MemoryPool* pool, int log_num_rows_min) {
+#ifndef NDEBUG
+  if (num_rows_allocated_ > 0) {
+    ARROW_DCHECK(data_type_ != NULLPTR);
+    KeyColumnMetadata metadata_before = ColumnMetadataFromDataType(data_type_);
+    KeyColumnMetadata metadata_after = ColumnMetadataFromDataType(data_type);
+    ARROW_DCHECK(metadata_before.is_fixed_length == metadata_after.is_fixed_length &&
+                 metadata_before.fixed_length == metadata_after.fixed_length);
+  }
+#endif
+  Clear(/*release_buffers=*/false);
+  log_num_rows_min_ = log_num_rows_min;
+  data_type_ = data_type;
+  pool_ = pool;
+}
+
+void ResizableArrayData::Clear(bool release_buffers) {
+  num_rows_ = 0;
+  if (release_buffers) {
+    non_null_buf_.reset();
+    fixed_len_buf_.reset();
+    var_len_buf_.reset();
+    num_rows_allocated_ = 0;
+    var_len_buf_size_ = 0;
+  }
+}
+
+Status ResizableArrayData::ResizeFixedLengthBuffers(int num_rows_new) {
+  ARROW_DCHECK(num_rows_new >= 0);
+  if (num_rows_new <= num_rows_allocated_) {
+    num_rows_ = num_rows_new;
+    return Status::OK();
+  }
+
+  int num_rows_allocated_new = 1 << log_num_rows_min_;
+  while (num_rows_allocated_new < num_rows_new) {
+    num_rows_allocated_new *= 2;
+  }
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (fixed_len_buf_ == NULLPTR) {
+    ARROW_DCHECK(non_null_buf_ == NULLPTR && var_len_buf_ == NULLPTR);
+
+    ARROW_ASSIGN_OR_RAISE(
+        non_null_buf_,
+        AllocateResizableBuffer(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes, pool_));
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes,
+                pool_));
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            fixed_len_buf_,
+            AllocateResizableBuffer(
+                num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes,
+                pool_));
+      }
+    } else {
+      ARROW_ASSIGN_OR_RAISE(
+          fixed_len_buf_,
+          AllocateResizableBuffer(
+              (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes, pool_));
+    }
+
+    ARROW_ASSIGN_OR_RAISE(var_len_buf_, AllocateResizableBuffer(
+                                            sizeof(uint64_t) + kNumPaddingBytes, pool_));
+
+    var_len_buf_size_ = sizeof(uint64_t);
+  } else {
+    ARROW_DCHECK(non_null_buf_ != NULLPTR && var_len_buf_ != NULLPTR);
+
+    RETURN_NOT_OK(non_null_buf_->Resize(bit_util::BytesForBits(num_rows_allocated_new) +
+                                        kNumPaddingBytes));
+
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            bit_util::BytesForBits(num_rows_allocated_new) + kNumPaddingBytes));
+      } else {
+        RETURN_NOT_OK(fixed_len_buf_->Resize(
+            num_rows_allocated_new * column_metadata.fixed_length + kNumPaddingBytes));
+      }
+    } else {
+      RETURN_NOT_OK(fixed_len_buf_->Resize(
+          (num_rows_allocated_new + 1) * sizeof(uint32_t) + kNumPaddingBytes));
+    }
+  }
+
+  num_rows_allocated_ = num_rows_allocated_new;
+  num_rows_ = num_rows_new;
+
+  return Status::OK();
+}
+
+Status ResizableArrayData::ResizeVaryingLengthBuffer() {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  if (!column_metadata.is_fixed_length) {
+    int min_new_size = static_cast<int>(
+        reinterpret_cast<const uint32_t*>(fixed_len_buf_->data())[num_rows_]);
+    ARROW_DCHECK(var_len_buf_size_ > 0);
+    if (var_len_buf_size_ < min_new_size) {
+      int new_size = var_len_buf_size_;
+      while (new_size < min_new_size) {
+        new_size *= 2;
+      }
+      RETURN_NOT_OK(var_len_buf_->Resize(new_size + kNumPaddingBytes));
+      var_len_buf_size_ = new_size;
+    }
+  }
+
+  return Status::OK();
+}
+
+KeyColumnArray ResizableArrayData::column_array() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+  return KeyColumnArray(column_metadata, num_rows_, non_null_buf_->mutable_data(),
+                        fixed_len_buf_->mutable_data(), var_len_buf_->mutable_data());
+}
+
+std::shared_ptr<ArrayData> ResizableArrayData::array_data() const {
+  KeyColumnMetadata column_metadata;
+  column_metadata = ColumnMetadataFromDataType(data_type_);
+
+  auto valid_count = arrow::internal::CountSetBits(non_null_buf_->data(), /*offset=*/0,
+                                                   static_cast<int64_t>(num_rows_));
+  int null_count = static_cast<int>(num_rows_) - static_cast<int>(valid_count);
+
+  if (column_metadata.is_fixed_length) {
+    return ArrayData::Make(data_type_, num_rows_, {non_null_buf_, fixed_len_buf_},
+                           null_count);
+  } else {
+    return ArrayData::Make(data_type_, num_rows_,
+                           {non_null_buf_, fixed_len_buf_, var_len_buf_}, null_count);
+  }
+}
+
+int ExecBatchBuilder::NumRowsToSkip(const std::shared_ptr<ArrayData>& column,
+                                    int num_rows, const uint16_t* row_ids,
+                                    int num_tail_bytes_to_skip) {
+#ifndef NDEBUG
+  // Ids must be in non-decreasing order
+  //
+  for (int i = 1; i < num_rows; ++i) {
+    ARROW_DCHECK(row_ids[i] >= row_ids[i - 1]);
+  }
+#endif
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(column->type);
+
+  int num_rows_left = num_rows;
+  int num_bytes_skipped = 0;
+  while (num_rows_left > 0 && num_bytes_skipped < num_tail_bytes_to_skip) {
+    if (column_metadata.is_fixed_length) {
+      if (column_metadata.fixed_length == 0) {
+        num_rows_left = std::max(num_rows_left, 8) - 8;
+        ++num_bytes_skipped;
+      } else {
+        --num_rows_left;
+        num_bytes_skipped += column_metadata.fixed_length;
+      }
+    } else {
+      --num_rows_left;
+      int row_id_removed = row_ids[num_rows_left];
+      const uint32_t* offsets =
+          reinterpret_cast<const uint32_t*>(column->buffers[1]->data());
+      num_bytes_skipped += offsets[row_id_removed + 1] - offsets[row_id_removed];
+    }
+  }
+
+  return num_rows - num_rows_left;
+}
+
+template <bool OUTPUT_BYTE_ALIGNED>
+void ExecBatchBuilder::CollectBitsImp(const uint8_t* input_bits,
+                                      int64_t input_bits_offset, uint8_t* output_bits,
+                                      int64_t output_bits_offset, int num_rows,
+                                      const uint16_t* row_ids) {
+  if (!OUTPUT_BYTE_ALIGNED) {
+    ARROW_DCHECK(output_bits_offset % 8 > 0);
+    output_bits[output_bits_offset / 8] &=
+        static_cast<uint8_t>((1 << (output_bits_offset % 8)) - 1);
+  } else {
+    ARROW_DCHECK(output_bits_offset % 8 == 0);
+  }
+  constexpr int unroll = 8;
+  for (int i = 0; i < num_rows / unroll; ++i) {
+    const uint16_t* row_ids_base = row_ids + unroll * i;
+    uint8_t result;
+    result = bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[0]) ? 1 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[1]) ? 2 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[2]) ? 4 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[3]) ? 8 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[4]) ? 16 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[5]) ? 32 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[6]) ? 64 : 0;
+    result |= bit_util::GetBit(input_bits, input_bits_offset + row_ids_base[7]) ? 128 : 0;
+    if (OUTPUT_BYTE_ALIGNED) {
+      output_bits[output_bits_offset / 8 + i] = result;
+    } else {
+      output_bits[output_bits_offset / 8 + i] |=
+          static_cast<uint8_t>(result << (output_bits_offset % 8));
+      output_bits[output_bits_offset / 8 + i + 1] =
+          static_cast<uint8_t>(result >> (8 - (output_bits_offset % 8)));
+    }
+  }
+  if (num_rows % unroll > 0) {
+    for (int i = num_rows - (num_rows % unroll); i < num_rows; ++i) {
+      bit_util::SetBitTo(output_bits, output_bits_offset + i,
+                         bit_util::GetBit(input_bits, input_bits_offset + row_ids[i]));
+    }
+  }
+}
+
+void ExecBatchBuilder::CollectBits(const uint8_t* input_bits, int64_t input_bits_offset,
+                                   uint8_t* output_bits, int64_t output_bits_offset,
+                                   int num_rows, const uint16_t* row_ids) {
+  if (output_bits_offset % 8 > 0) {
+    CollectBitsImp<false>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                          num_rows, row_ids);
+  } else {
+    CollectBitsImp<true>(input_bits, input_bits_offset, output_bits, output_bits_offset,
+                         num_rows, row_ids);
+  }
+}
+
+template <class PROCESS_VALUE_FN>
+void ExecBatchBuilder::Visit(const std::shared_ptr<ArrayData>& column, int num_rows,
+                             const uint16_t* row_ids, PROCESS_VALUE_FN process_value_fn) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(column->type);
+
+  if (!metadata.is_fixed_length) {
+    const uint8_t* ptr_base = column->buffers[2]->data();
+    const uint32_t* offsets =
+        reinterpret_cast<const uint32_t*>(column->buffers[1]->data()) + column->offset;
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr = ptr_base + offsets[row_id];
+      uint32_t field_length = offsets[row_id + 1] - offsets[row_id];
+      process_value_fn(i, field_ptr, field_length);
+    }
+  } else {
+    ARROW_DCHECK(metadata.fixed_length > 0);
+    for (int i = 0; i < num_rows; ++i) {
+      uint16_t row_id = row_ids[i];
+      const uint8_t* field_ptr =
+          column->buffers[1]->data() +
+          (column->offset + row_id) * static_cast<int64_t>(metadata.fixed_length);
+      process_value_fn(i, field_ptr, metadata.fixed_length);
+    }
+  }
+}
+
+Status ExecBatchBuilder::AppendSelected(const std::shared_ptr<ArrayData>& source,
+                                        ResizableArrayData* target,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        MemoryPool* pool) {
+  int num_rows_before = target->num_rows();
+  ARROW_DCHECK(num_rows_before >= 0);
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target->num_rows() == 0) {
+    target->Init(source->type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target->ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(source->type);
+
+  if (column_metadata.is_fixed_length) {
+    // Fixed length column
+    //
+    uint32_t fixed_length = column_metadata.fixed_length;
+    switch (fixed_length) {
+      case 0:
+        CollectBits(source->buffers[1]->data(), source->offset, target->mutable_data(1),
+                    num_rows_before, num_rows_to_append, row_ids);
+        break;
+      case 1:
+        Visit(source, num_rows_to_append, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                target->mutable_data(1)[num_rows_before + i] = *ptr;
+              });
+        break;
+      case 2:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint16_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint16_t*>(ptr);
+            });
+        break;
+      case 4:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint32_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint32_t*>(ptr);
+            });
+        break;
+      case 8:
+        Visit(
+            source, num_rows_to_append, row_ids,
+            [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+              reinterpret_cast<uint64_t*>(target->mutable_data(1))[num_rows_before + i] =
+                  *reinterpret_cast<const uint64_t*>(ptr);
+            });
+        break;
+      default: {
+        int num_rows_to_process =
+            num_rows_to_append -
+            NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+        Visit(source, num_rows_to_process, row_ids,
+              [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                uint64_t* dst = reinterpret_cast<uint64_t*>(
+                    target->mutable_data(1) +
+                    static_cast<int64_t>(num_bytes) * (num_rows_before + i));
+                const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                for (uint32_t word_id = 0;
+                     word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t));
+                     ++word_id) {
+                  util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+                }
+              });
+        if (num_rows_to_append > num_rows_to_process) {
+          Visit(source, num_rows_to_append - num_rows_to_process,
+                row_ids + num_rows_to_process,
+                [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+                  uint64_t* dst = reinterpret_cast<uint64_t*>(
+                      target->mutable_data(1) +
+                      static_cast<int64_t>(num_bytes) *
+                          (num_rows_before + num_rows_to_process + i));
+                  const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+                  memcpy(dst, src, num_bytes);
+                });
+        }
+      }
+    }
+  } else {
+    // Varying length column
+    //
+
+    // Step 1: calculate target offsets
+    //
+    uint32_t* offsets = reinterpret_cast<uint32_t*>(target->mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : offsets[num_rows_before];
+    Visit(source, num_rows_to_append, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            offsets[num_rows_before + i] = num_bytes;
+          });
+    for (int i = 0; i < num_rows_to_append; ++i) {
+      uint32_t length = offsets[num_rows_before + i];
+      offsets[num_rows_before + i] = sum;
+      sum += length;
+    }
+    offsets[num_rows_before + num_rows_to_append] = sum;
+
+    // Step 2: resize output buffers
+    //
+    RETURN_NOT_OK(target->ResizeVaryingLengthBuffer());
+
+    // Step 3: copy varying-length data
+    //
+    int num_rows_to_process =
+        num_rows_to_append -
+        NumRowsToSkip(source, num_rows_to_append, row_ids, sizeof(uint64_t));
+    Visit(source, num_rows_to_process, row_ids,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(target->mutable_data(2) +
+                                                        offsets[num_rows_before + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            for (uint32_t word_id = 0;
+                 word_id < bit_util::CeilDiv(num_bytes, sizeof(uint64_t)); ++word_id) {
+              util::SafeStore<uint64_t>(dst + word_id, util::SafeLoad(src + word_id));
+            }
+          });
+    Visit(source, num_rows_to_append - num_rows_to_process, row_ids + num_rows_to_process,
+          [&](int i, const uint8_t* ptr, uint32_t num_bytes) {
+            uint64_t* dst = reinterpret_cast<uint64_t*>(
+                target->mutable_data(2) +
+                offsets[num_rows_before + num_rows_to_process + i]);
+            const uint64_t* src = reinterpret_cast<const uint64_t*>(ptr);
+            memcpy(dst, src, num_bytes);
+          });
+  }
+
+  // Process nulls
+  //
+  if (source->buffers[0] == NULLPTR) {
+    uint8_t* dst = target->mutable_data(0);
+    dst[num_rows_before / 8] |= static_cast<uint8_t>(~0ULL << (num_rows_before & 7));
+    for (int i = num_rows_before / 8 + 1;
+         i < bit_util::BytesForBits(num_rows_before + num_rows_to_append); ++i) {
+      dst[i] = 0xff;
+    }
+  } else {
+    CollectBits(source->buffers[0]->data(), source->offset, target->mutable_data(0),
+                num_rows_before, num_rows_to_append, row_ids);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(const std::shared_ptr<DataType>& type,
+                                     ResizableArrayData& target, int num_rows_to_append,
+                                     MemoryPool* pool) {
+  int num_rows_before = target.num_rows();
+  int num_rows_after = num_rows_before + num_rows_to_append;
+  if (target.num_rows() == 0) {
+    target.Init(type, pool, kLogNumRows);
+  }
+  RETURN_NOT_OK(target.ResizeFixedLengthBuffers(num_rows_after));
+
+  KeyColumnMetadata column_metadata = ColumnMetadataFromDataType(type);
+
+  // Process fixed length buffer
+  //
+  if (column_metadata.is_fixed_length) {
+    uint8_t* dst = target.mutable_data(1);
+    if (column_metadata.fixed_length == 0) {
+      dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+      int64_t offset_begin = num_rows_before / 8 + 1;
+      int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+      if (offset_end > offset_begin) {
+        memset(dst + offset_begin, 0, offset_end - offset_begin);
+      }
+    } else {
+      memset(dst + num_rows_before * static_cast<int64_t>(column_metadata.fixed_length),
+             0, static_cast<int64_t>(column_metadata.fixed_length) * num_rows_to_append);
+    }
+  } else {
+    uint32_t* dst = reinterpret_cast<uint32_t*>(target.mutable_data(1));
+    uint32_t sum = num_rows_before == 0 ? 0 : dst[num_rows_before];
+    for (int64_t i = num_rows_before; i <= num_rows_after; ++i) {
+      dst[i] = sum;
+    }
+  }
+
+  // Process nulls
+  //
+  uint8_t* dst = target.mutable_data(0);
+  dst[num_rows_before / 8] &= static_cast<uint8_t>((1 << (num_rows_before % 8)) - 1);
+  int64_t offset_begin = num_rows_before / 8 + 1;
+  int64_t offset_end = bit_util::BytesForBits(num_rows_after);
+  if (offset_end > offset_begin) {
+    memset(dst + offset_begin, 0, offset_end - offset_begin);
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int num_cols, const int* col_ids) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(num_cols);
+    for (int i = 0; i < num_cols; ++i) {
+      const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+      ARROW_DCHECK(data.is_array());
+      const std::shared_ptr<ArrayData>& array_data = data.array();
+      values_[i].Init(array_data->type, pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    const Datum& data = batch.values[col_ids ? col_ids[i] : i];
+    ARROW_DCHECK(data.is_array());
+    const std::shared_ptr<ArrayData>& array_data = data.array();
+    RETURN_NOT_OK(
+        AppendSelected(array_data, &values_[i], num_rows_to_append, row_ids, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendSelected(MemoryPool* pool, const ExecBatch& batch,
+                                        int num_rows_to_append, const uint16_t* row_ids,
+                                        int* num_appended, int num_cols,
+                                        const int* col_ids) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendSelected(pool, batch, num_rows_next, row_ids, num_cols, col_ids));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append) {
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+
+  // If this is the first time we append rows, then initialize output buffers.
+  //
+  if (values_.empty()) {
+    values_.resize(types.size());
+    for (size_t i = 0; i < types.size(); ++i) {
+      values_[i].Init(types[i], pool, kLogNumRows);
+    }
+  }
+
+  for (size_t i = 0; i < values_.size(); ++i) {
+    RETURN_NOT_OK(AppendNulls(types[i], values_[i], num_rows_to_append, pool));
+  }
+
+  return Status::OK();
+}
+
+Status ExecBatchBuilder::AppendNulls(MemoryPool* pool,
+                                     const std::vector<std::shared_ptr<DataType>>& types,
+                                     int num_rows_to_append, int* num_appended) {
+  *num_appended = 0;
+  if (num_rows_to_append == 0) {
+    return Status::OK();
+  }
+  int num_rows_max = 1 << kLogNumRows;
+  int num_rows_present = num_rows();
+  if (num_rows_present >= num_rows_max) {
+    return Status::OK();
+  }
+  int num_rows_available = num_rows_max - num_rows_present;
+  int num_rows_next = std::min(num_rows_available, num_rows_to_append);
+  RETURN_NOT_OK(AppendNulls(pool, types, num_rows_next));
+  *num_appended = num_rows_next;
+  return Status::OK();
+}
+
+ExecBatch ExecBatchBuilder::Flush() {
+  ARROW_DCHECK(num_rows() > 0);

Review Comment:
   It is a very simple assertion for the caller to maintain, caller always uses the pattern: "if (b.num_rows() > 0) {auto batch = b.Flush(); /* output batch */ }".
   
   Returning empty batch is not implemented, current code will not work in that case.
   
   If you prefer to have Flush() return some kind of optional result, let me know what you would like function signature to look like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1101942706

   I've added ARROW-16224 for for the follow-up to add support for large binary types and dictionary encoded types as key columns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r850827924


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.

Review Comment:
   How do these compare to a theoretical ExecBatch refactor?



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {

Review Comment:
   Given there's already `offsets` maybe it would be better to just have 3/6 accessors and not restrict the accesors based on is_fixed_length



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.

Review Comment:
   I suppose these are far too limited in terms of data type support



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)

Review Comment:
   Is this worth it compared to the usual Result<> pattern?



##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};

Review Comment:
   Why are these commented out?



##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};
+
+TEST(KeyColumnMetadata, FromDataType) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean());
+  ASSERT_EQ(0, metadata.fixed_length);
+  ASSERT_EQ(true, metadata.is_fixed_length);
+  ASSERT_EQ(false, metadata.is_null_type);
+
+  metadata = ColumnMetadataFromDataType(null());
+  ASSERT_EQ(true, metadata.is_null_type);
+
+  for (const auto& type : kSampleFixedDataTypes) {
+    int byte_width =
+        internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(byte_width, metadata.fixed_length);
+    ASSERT_EQ(true, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {binary(), utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(4, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {large_binary(), large_utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(8, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+    // source array) while `offset` is the offset we pass when constructing the
+    // KeyColumnArray
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array;
+        int byte_width =
+            internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+        if (is_decimal(type->id())) {
+          array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+        } else {
+          array = ArrayFromJSON(type, "[1, 2, null]");
+        }
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * byte_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        // Maximum tested offset is < 8 so validity is just bit offset
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(nullptr, kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+        int offsets_width =
+            static_cast<int>(internal::checked_pointer_cast<BaseBinaryType>(type)
+                                 ->layout()
+                                 .buffers[1]
+                                 .byte_width);
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+  for (auto array_offset : {0, 1}) {
+    ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+    for (auto offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+      std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+      array = array->Slice(array_offset);
+      int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+      KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+      ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+      ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+      ASSERT_EQ(length, kc_array.length());
+      ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+      ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+    }
+  }
+}
+
+// TEST(KeyColumnArray, FromArrayDataNull) {

Review Comment:
   I also see above that dictionary type may be handled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851077893


##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};
+
+TEST(KeyColumnMetadata, FromDataType) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean());
+  ASSERT_EQ(0, metadata.fixed_length);
+  ASSERT_EQ(true, metadata.is_fixed_length);
+  ASSERT_EQ(false, metadata.is_null_type);
+
+  metadata = ColumnMetadataFromDataType(null());
+  ASSERT_EQ(true, metadata.is_null_type);
+
+  for (const auto& type : kSampleFixedDataTypes) {
+    int byte_width =
+        internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(byte_width, metadata.fixed_length);
+    ASSERT_EQ(true, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {binary(), utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(4, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {large_binary(), large_utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(8, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+    // source array) while `offset` is the offset we pass when constructing the
+    // KeyColumnArray
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array;
+        int byte_width =
+            internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+        if (is_decimal(type->id())) {
+          array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+        } else {
+          array = ArrayFromJSON(type, "[1, 2, null]");
+        }
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * byte_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        // Maximum tested offset is < 8 so validity is just bit offset
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(nullptr, kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+        int offsets_width =
+            static_cast<int>(internal::checked_pointer_cast<BaseBinaryType>(type)
+                                 ->layout()
+                                 .buffers[1]
+                                 .byte_width);
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+  for (auto array_offset : {0, 1}) {
+    ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+    for (auto offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+      std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+      array = array->Slice(array_offset);
+      int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+      KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+      ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+      ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+      ASSERT_EQ(length, kc_array.length());
+      ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+      ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+    }
+  }
+}
+
+// TEST(KeyColumnArray, FromArrayDataNull) {

Review Comment:
   I removed this part. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r853572753


##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               uint8_t* buffer0, uint8_t* buffer1, uint8_t* buffer2,
+                               int bit_offset0, int bit_offset1) {
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = mutable_buffers_[0] = buffer0;
+  buffers_[1] = mutable_buffers_[1] = buffer1;
+  buffers_[2] = mutable_buffers_[2] = buffer2;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}

Review Comment:
   done



##########
cpp/src/arrow/compute/light_array.cc:
##########
@@ -0,0 +1,723 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <type_traits>
+
+#include "arrow/util/bitmap_ops.h"
+
+namespace arrow {
+namespace compute {
+
+KeyColumnArray::KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                               const uint8_t* buffer0, const uint8_t* buffer1,
+                               const uint8_t* buffer2, int bit_offset0, int bit_offset1) {
+  static_assert(std::is_pod<KeyColumnArray>::value,
+                "This class was intended to be a POD type");
+  metadata_ = metadata;
+  length_ = length;
+  buffers_[0] = buffer0;
+  buffers_[1] = buffer1;
+  buffers_[2] = buffer2;
+  mutable_buffers_[0] = mutable_buffers_[1] = mutable_buffers_[2] = nullptr;
+  bit_offset_[0] = bit_offset0;
+  bit_offset_[1] = bit_offset1;
+}

Review Comment:
   done



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1097585985

   https://issues.apache.org/jira/browse/ARROW-16166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851134165


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)

Review Comment:
   I changed it to return Result<>. 
   
   In hash join and hash group by data type checks are done during initialization of ExecNode, so that introduces redundant checks of status in many places in their code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851079678


##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};

Review Comment:
   Also, dictionary type is not supported in KeyColumn... (hash table either works with indices from dictionary or decoded values). Once new hash join is ready, there will be a follow up change to port dictionary type support from old hash join to new hash join, which will not have it initially. At this point some of the code from this PR may need to be updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r853573176


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);

Review Comment:
   done



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+  // Constants used for accessing buffers using data() and mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));

Review Comment:
   done



##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct ARROW_EXPORT KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class ARROW_EXPORT KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+
+  // Constants used for accessing buffers using data() and mutable_data().
+  static constexpr int kValidityBuffer = 0;
+  static constexpr int kFixedLengthBuffer = 1;
+  static constexpr int kVariableLengthBuffer = 2;
+
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= kMaxBuffers);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm closed pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output
URL: https://github.com/apache/arrow/pull/12872


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1104881812

   @lidavidm Any further thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #12872:
URL: https://github.com/apache/arrow/pull/12872#issuecomment-1099604844

   @pitrou @lidavidm I'd appreciate your eyes on this if you have time.  I think these utilities could be generally useful outside of hash-join and it might be nice to have some more eyes on them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r850768949


##########
cpp/src/arrow/compute/light_array_test.cc:
##########
@@ -0,0 +1,504 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/compute/light_array.h"
+
+#include <gtest/gtest.h>
+
+#include "arrow/compute/exec/test_util.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+
+namespace arrow {
+namespace compute {
+
+const std::vector<std::shared_ptr<DataType>> kSampleFixedDataTypes = {
+    int8(),   int16(),  int32(),  int64(),           uint8(),
+    uint16(), uint32(), uint64(), decimal128(38, 6), decimal256(76, 6)};
+const std::vector<std::shared_ptr<DataType>> kSampleBinaryTypes = {
+    utf8(), binary() /*, large_utf8(), large_binary()*/};
+
+TEST(KeyColumnMetadata, FromDataType) {
+  KeyColumnMetadata metadata = ColumnMetadataFromDataType(boolean());
+  ASSERT_EQ(0, metadata.fixed_length);
+  ASSERT_EQ(true, metadata.is_fixed_length);
+  ASSERT_EQ(false, metadata.is_null_type);
+
+  metadata = ColumnMetadataFromDataType(null());
+  ASSERT_EQ(true, metadata.is_null_type);
+
+  for (const auto& type : kSampleFixedDataTypes) {
+    int byte_width =
+        internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(byte_width, metadata.fixed_length);
+    ASSERT_EQ(true, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {binary(), utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(4, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+
+  for (const auto& type : {large_binary(), large_utf8()}) {
+    metadata = ColumnMetadataFromDataType(type);
+    ASSERT_EQ(8, metadata.fixed_length);
+    ASSERT_EQ(false, metadata.is_fixed_length);
+    ASSERT_EQ(false, metadata.is_null_type);
+  }
+}
+
+TEST(KeyColumnArray, FromArrayData) {
+  for (const auto& type : kSampleFixedDataTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    // `array_offset` is the offset of the source array (e.g. if we are given a sliced
+    // source array) while `offset` is the offset we pass when constructing the
+    // KeyColumnArray
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array;
+        int byte_width =
+            internal::checked_pointer_cast<FixedWidthType>(type)->bit_width() / 8;
+        if (is_decimal(type->id())) {
+          array = ArrayFromJSON(type, R"(["1.123123", "2.123123", null])");
+        } else {
+          array = ArrayFromJSON(type, "[1, 2, null]");
+        }
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * byte_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        // Maximum tested offset is < 8 so validity is just bit offset
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(nullptr, kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBinary) {
+  for (const auto& type : kSampleBinaryTypes) {
+    ARROW_SCOPED_TRACE("Type: ", type->ToString());
+    for (auto array_offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+      for (auto offset : {0, 1}) {
+        ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+        std::shared_ptr<Array> array = ArrayFromJSON(type, R"(["xyz", "abcabc", null])");
+        int offsets_width =
+            static_cast<int>(internal::checked_pointer_cast<BaseBinaryType>(type)
+                                 ->layout()
+                                 .buffers[1]
+                                 .byte_width);
+        array = array->Slice(array_offset);
+        int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+        int buffer_offset_bytes = (offset + array_offset) * offsets_width;
+        KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+        ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+        ASSERT_EQ(0, kc_array.bit_offset(1));
+        ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+        ASSERT_EQ(array->data()->buffers[1]->data() + buffer_offset_bytes,
+                  kc_array.data(1));
+        ASSERT_EQ(array->data()->buffers[2]->data(), kc_array.data(2));
+        ASSERT_EQ(length, kc_array.length());
+        // When creating from ArrayData always create read-only
+        ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+        ASSERT_EQ(nullptr, kc_array.mutable_data(2));
+      }
+    }
+  }
+}
+
+TEST(KeyColumnArray, FromArrayDataBool) {
+  for (auto array_offset : {0, 1}) {
+    ARROW_SCOPED_TRACE("Array offset: ", array_offset);
+    for (auto offset : {0, 1}) {
+      ARROW_SCOPED_TRACE("Constructor offset: ", offset);
+      std::shared_ptr<Array> array = ArrayFromJSON(boolean(), "[true, false, null]");
+      array = array->Slice(array_offset);
+      int length = static_cast<int32_t>(array->length()) - offset - array_offset;
+      KeyColumnArray kc_array = ColumnArrayFromArrayData(array->data(), offset, length);
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(0));
+      ASSERT_EQ(offset + array_offset, kc_array.bit_offset(1));
+      ASSERT_EQ(array->data()->buffers[0]->data(), kc_array.data(0));
+      ASSERT_EQ(array->data()->buffers[1]->data(), kc_array.data(1));
+      ASSERT_EQ(length, kc_array.length());
+      ASSERT_EQ(nullptr, kc_array.mutable_data(0));
+      ASSERT_EQ(nullptr, kc_array.mutable_data(1));
+    }
+  }
+}
+
+// TEST(KeyColumnArray, FromArrayDataNull) {

Review Comment:
   Ah, my mistake.  This test fails by the way so I wasn't sure if that was something we wanted to address in this PR or file a follow-up JIRA for.  I think the hash-join PR doesn't operate on null columns so it doesn't need it but since we specific code branches to support null in a few spots it seemed like we would want to support it at some point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] michalursa commented on a diff in pull request #12872: ARROW-16166: [C++][Compute] Utilities for assembling join output

Posted by GitBox <gi...@apache.org>.
michalursa commented on code in PR #12872:
URL: https://github.com/apache/arrow/pull/12872#discussion_r851078401


##########
cpp/src/arrow/compute/light_array.h:
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "arrow/array.h"
+#include "arrow/compute/exec.h"
+#include "arrow/type.h"
+#include "arrow/util/logging.h"
+
+/// This file contains lightweight containers for Arrow buffers.  These containers
+/// makes compromises in terms of strong ownership and the range of data types supported
+/// in order to gain performance and reduced overhead.
+
+namespace arrow {
+namespace compute {
+
+/// \brief Description of the layout of a "key" column
+///
+/// A "key" column is a non-nested, non-union column.
+/// Every key column has either 0 (null), 2 (e.g. int32) or 3 (e.g. string) buffers
+/// and no children.
+///
+/// This metadata object is a zero-allocation analogue of arrow::DataType
+struct KeyColumnMetadata {
+  KeyColumnMetadata() = default;
+  KeyColumnMetadata(bool is_fixed_length_in, uint32_t fixed_length_in,
+                    bool is_null_type_in = false)
+      : is_fixed_length(is_fixed_length_in),
+        is_null_type(is_null_type_in),
+        fixed_length(fixed_length_in) {}
+  /// \brief True if the column is not a varying-length binary type
+  ///
+  /// If this is true the column will have a validity buffer and
+  /// a data buffer and the third buffer will be unused.
+  bool is_fixed_length;
+  /// \brief True if this column is the null type
+  bool is_null_type;
+  /// \brief The number of bytes for each item
+  ///
+  /// Zero has a special meaning, indicating a bit vector with one bit per value if it
+  /// isn't a null type column.
+  ///
+  /// For a varying-length binary column this represents the number of bytes per offset.
+  uint32_t fixed_length;
+};
+
+/// \brief A lightweight view into a "key" array
+///
+/// A "key" column is a non-nested, non-union column \see KeyColumnMetadata
+///
+/// This metadata object is a zero-allocation analogue of arrow::ArrayData
+class KeyColumnArray {
+ public:
+  /// \brief Create an uninitialized KeyColumnArray
+  KeyColumnArray() = default;
+  /// \brief Create a read-only view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length,
+                 const uint8_t* buffer0, const uint8_t* buffer1, const uint8_t* buffer2,
+                 int bit_offset0 = 0, int bit_offset1 = 0);
+  /// \brief Create a mutable view from buffers
+  ///
+  /// This is a view only and does not take ownership of the buffers.  The lifetime
+  /// of the buffers must exceed the lifetime of this view
+  KeyColumnArray(const KeyColumnMetadata& metadata, int64_t length, uint8_t* buffer0,
+                 uint8_t* buffer1, uint8_t* buffer2, int bit_offset0 = 0,
+                 int bit_offset1 = 0);
+  /// \brief Create a sliced view of `this`
+  ///
+  /// The number of rows used in offset must be divisible by 8
+  /// in order to not split bit vectors within a single byte.
+  KeyColumnArray Slice(int64_t offset, int64_t length) const;
+  /// \brief Create a copy of `this` with a buffer from `other`
+  ///
+  /// The copy will be identical to `this` except the buffer at buffer_id_to_replace
+  /// will be replaced by the corresponding buffer in `other`.
+  KeyColumnArray WithBufferFrom(const KeyColumnArray& other,
+                                int buffer_id_to_replace) const;
+
+  /// \brief Create a copy of `this` with new metadata
+  KeyColumnArray WithMetadata(const KeyColumnMetadata& metadata) const;
+  /// \brief Return one of the underlying mutable buffers
+  uint8_t* mutable_data(int i) {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return mutable_buffers_[i];
+  }
+  /// \brief Return one of the underlying read-only buffers
+  const uint8_t* data(int i) const {
+    ARROW_DCHECK(i >= 0 && i <= max_buffers_);
+    return buffers_[i];
+  }
+  /// \brief Return a mutable version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  uint32_t* mutable_offsets() {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<uint32_t*>(mutable_data(1));
+  }
+  /// \brief Return a read-only version of the offsets buffer
+  ///
+  /// Only valid if this is a view into a varbinary type
+  const uint32_t* offsets() const {
+    DCHECK(!metadata_.is_fixed_length);
+    return reinterpret_cast<const uint32_t*>(data(1));
+  }
+  /// \brief Return the type metadata
+  const KeyColumnMetadata& metadata() const { return metadata_; }
+  /// \brief Return the length (in rows) of the array
+  int64_t length() const { return length_; }
+  /// \brief Return the bit offset into the corresponding vector
+  ///
+  /// if i == 1 then this must be a bool array
+  int bit_offset(int i) const {
+    ARROW_DCHECK(i >= 0 && i < max_buffers_);
+    return bit_offset_[i];
+  }
+
+ private:
+  static constexpr int max_buffers_ = 3;
+  const uint8_t* buffers_[max_buffers_];
+  uint8_t* mutable_buffers_[max_buffers_];
+  KeyColumnMetadata metadata_;
+  int64_t length_;
+  // Starting bit offset within the first byte (between 0 and 7)
+  // to be used when accessing buffers that store bit vectors.
+  int bit_offset_[max_buffers_ - 1];
+};
+
+/// \brief Create KeyColumnMetadata from a DataType
+///
+/// If `type` is a dictionary type then this will return the KeyColumnMetadata for
+/// the indices type
+///
+/// The caller should ensure this is only called on "key" columns.  Calling this with
+/// a non-key column will return a meaningless value (or abort on a debug build)

Review Comment:
   > Looks good to me, I guess I already reviewed a bunch of this code on the other PR. One thing: is it worth to add `Hashing32::HashBatch` and `Hashing64::HashBatch` in this PR? I need those for Bloom Filter pushdown, not sure if those fit better here or there.
   
   Hashing is unrelated to the topic of this PR, but we could possibly make a tiny separate PR just for HashBatch (or put it into Bloom filter pushdown).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org