You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/28 21:54:57 UTC

[GitHub] [arrow] wesm commented on a change in pull request #7507: ARROW-8797: [C++] Read RecordBatch in a different endian

wesm commented on a change in pull request #7507:
URL: https://github.com/apache/arrow/pull/7507#discussion_r513711536



##########
File path: cpp/src/arrow/array/util.h
##########
@@ -37,6 +37,11 @@ namespace arrow {
 ARROW_EXPORT
 std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data);
 
+/// \brief Swap endian of each element in a generic ArrayData
+/// \param[in] data the array contents to be swapped
+ARROW_EXPORT
+void SwapEndianArrayData(std::shared_ptr<ArrayData>& data);

Review comment:
       Since this is allocating memory anyway, it might be better for this to return a new ArrayData and leave the existing one unmodified, unless you are trying to optimize memory use. In either case, a mutable reference should not be passed -- `ArrayData*` for in-place modification (not recommended, since it will only work if the buffers are mutable, which is not guaranteed) or `const ArrayData&` for copy
   
   Also, it may be better for this to be in an `internal::` namespace for now 

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;
+    }
+    return Status::OK();
+  }
+
+  template <typename VALUE_TYPE>
+  Status SwapOffset(int index) {
+    if (data_->buffers[index] == nullptr) {
+      return Status::OK();
+    }
+    auto data = reinterpret_cast<const VALUE_TYPE*>(data_->buffers[index]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
+                          AllocateBuffer(data_->buffers[index]->size() + 1));
+    auto new_data = reinterpret_cast<VALUE_TYPE*>(new_buffer->mutable_data());
+    // offset has one more element rather than data->length
+    int64_t length = length_ + 1;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[index] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+  Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+  template <typename T>
+  Status Visit(const T&) {
+    using value_type = typename T::c_type;
+    auto data = reinterpret_cast<const value_type*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<value_type*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal128Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp;
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      tmp = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#else
+      tmp = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal256Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp0, tmp1, tmp2;
+      auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+      tmp0 = BitUtil::FromBigEndian(data[idx]);
+      tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#else
+      tmp0 = BitUtil::FromLittleEndian(data[idx]);
+      tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const DayTimeIntervalType& type) {
+    auto data = reinterpret_cast<const uint32_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint32_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      new_data[idx] = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromBigEndian(data[idx + 1]);
+#else
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromLittleEndian(data[idx + 1]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const NullType& type) { return Status::OK(); }
+  Status Visit(const BooleanType& type) { return Status::OK(); }
+  Status Visit(const Int8Type& type) { return Status::OK(); }
+  Status Visit(const UInt8Type& type) { return Status::OK(); }
+  Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
+  Status Visit(const FixedSizeListType& type) { return Status::OK(); }
+  Status Visit(const StructType& type) { return Status::OK(); }
+  Status Visit(const SparseUnionType& type) { return Status::OK(); }
+
+  Status Visit(const StringType& type) {
+    RETURN_NOT_OK(SwapSmallOffset());
+    return Status::OK();
+  }
+  Status Visit(const LargeStringType& type) {
+    RETURN_NOT_OK(SwapLargeOffset());
+    return Status::OK();
+  }
+  Status Visit(const BinaryType& type) {
+    RETURN_NOT_OK(SwapSmallOffset());
+    return Status::OK();
+  }
+  Status Visit(const LargeBinaryType& type) {
+    RETURN_NOT_OK(SwapLargeOffset());
+    return Status::OK();
+  }
+
+  Status Visit(const ListType& type) {
+    RETURN_NOT_OK(SwapSmallOffset());
+    return Status::OK();
+  }
+  Status Visit(const LargeListType& type) {
+    RETURN_NOT_OK(SwapLargeOffset());
+    return Status::OK();
+  }
+
+  Status Visit(const MapType& type) {

Review comment:
       This case is covered by `ListType`

##########
File path: cpp/src/arrow/type.h
##########
@@ -1604,13 +1605,26 @@ class ARROW_EXPORT FieldRef {
 // ----------------------------------------------------------------------
 // Schema
 
+enum class Endianness {
+  LITTLE = 0,
+  BIG = 1,
+#if ARROW_LITTLE_ENDIAN
+  NATIVE = LITTLE
+#else
+  NATIVE = BIG
+#endif
+};

Review comment:
       Having endianness at the Schema level definitely has me scratching my head, since if you have only an `ArrayData` object, you have no way to know its endianness. In NumPy at least, which allows you to have non-native-endian data in-memory, the endianness is handled at the type level, so you could have a structured dtype in NumPy that has a mix of little and big-endian data. 
   
   Of course, in the Arrow protocol it's all or nothing -- either all of the data has to be little or big-endian in a protocol message. 
   
   One alternative to what's been done here is to have endianness at the type level and have an option in the IpcWriteOptions to force either native-endianness or little/big endian, which would cause any ArrayData with different endianness to be byte-swapped. 

##########
File path: cpp/src/arrow/ipc/options.h
##########
@@ -93,6 +93,12 @@ struct ARROW_EXPORT IpcReadOptions {
   /// like decompression
   bool use_threads = true;
 
+  /// \brief Convert endian of data element to platform-native endianness
+  /// if the endianness of the received schema is not equal to
+  /// platform-native endianness. This is effective at RecordBatchFileReader.Open(),
+  ///  RecordBatchStreamReader(), or StreamDecoder().
+  bool ensure_native_endian = true;

Review comment:
       See comments below about this

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -963,6 +1003,12 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader {
     // Get the schema and record any observed dictionaries
     RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options, &dictionary_memo_,
                                       &schema_, &out_schema_, &field_inclusion_mask_));
+    swap_endian_ = options.ensure_native_endian && !out_schema_->IsNativeEndianness();
+    if (swap_endian_) {
+      // create a new schema with native endianness before swapping endian in ArrayData
+      schema_ = schema_->WithNativeEndianness();
+      out_schema_ = out_schema_->WithNativeEndianness();
+    }

Review comment:
       this is basically the same as above, this suggests that something needs to be refactored

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;

Review comment:
       Can you not just call `SwapEndianArrayData` here?

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;
+    }
+    return Status::OK();
+  }
+
+  template <typename VALUE_TYPE>
+  Status SwapOffset(int index) {
+    if (data_->buffers[index] == nullptr) {
+      return Status::OK();
+    }
+    auto data = reinterpret_cast<const VALUE_TYPE*>(data_->buffers[index]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
+                          AllocateBuffer(data_->buffers[index]->size() + 1));
+    auto new_data = reinterpret_cast<VALUE_TYPE*>(new_buffer->mutable_data());
+    // offset has one more element rather than data->length
+    int64_t length = length_ + 1;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[index] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+  Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+  template <typename T>
+  Status Visit(const T&) {
+    using value_type = typename T::c_type;
+    auto data = reinterpret_cast<const value_type*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<value_type*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal128Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp;
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      tmp = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#else
+      tmp = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal256Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp0, tmp1, tmp2;
+      auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+      tmp0 = BitUtil::FromBigEndian(data[idx]);
+      tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#else
+      tmp0 = BitUtil::FromLittleEndian(data[idx]);
+      tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const DayTimeIntervalType& type) {
+    auto data = reinterpret_cast<const uint32_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint32_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      new_data[idx] = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromBigEndian(data[idx + 1]);
+#else
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromLittleEndian(data[idx + 1]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const NullType& type) { return Status::OK(); }
+  Status Visit(const BooleanType& type) { return Status::OK(); }
+  Status Visit(const Int8Type& type) { return Status::OK(); }
+  Status Visit(const UInt8Type& type) { return Status::OK(); }
+  Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
+  Status Visit(const FixedSizeListType& type) { return Status::OK(); }
+  Status Visit(const StructType& type) { return Status::OK(); }
+  Status Visit(const SparseUnionType& type) { return Status::OK(); }
+
+  Status Visit(const StringType& type) {

Review comment:
       this case is covered by BinaryType (have to use `enable_if_binary`)

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;
+    }
+    return Status::OK();
+  }
+
+  template <typename VALUE_TYPE>
+  Status SwapOffset(int index) {
+    if (data_->buffers[index] == nullptr) {
+      return Status::OK();
+    }
+    auto data = reinterpret_cast<const VALUE_TYPE*>(data_->buffers[index]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
+                          AllocateBuffer(data_->buffers[index]->size() + 1));
+    auto new_data = reinterpret_cast<VALUE_TYPE*>(new_buffer->mutable_data());
+    // offset has one more element rather than data->length
+    int64_t length = length_ + 1;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[index] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+  Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+  template <typename T>
+  Status Visit(const T&) {
+    using value_type = typename T::c_type;
+    auto data = reinterpret_cast<const value_type*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<value_type*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }

Review comment:
       There is code duplication here, could a function be factored out like:
   
   ```c++
   ARROW_ASSIGN_OR_RAISE(out_buffers[i], ByteSwapBuffer<T>(*in_buffers[i], length, memory_pool));
   ```

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;
+    }
+    return Status::OK();
+  }
+
+  template <typename VALUE_TYPE>
+  Status SwapOffset(int index) {
+    if (data_->buffers[index] == nullptr) {
+      return Status::OK();
+    }
+    auto data = reinterpret_cast<const VALUE_TYPE*>(data_->buffers[index]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
+                          AllocateBuffer(data_->buffers[index]->size() + 1));
+    auto new_data = reinterpret_cast<VALUE_TYPE*>(new_buffer->mutable_data());
+    // offset has one more element rather than data->length
+    int64_t length = length_ + 1;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[index] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+  Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+  template <typename T>
+  Status Visit(const T&) {
+    using value_type = typename T::c_type;
+    auto data = reinterpret_cast<const value_type*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<value_type*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal128Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp;
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      tmp = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#else
+      tmp = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal256Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp0, tmp1, tmp2;
+      auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+      tmp0 = BitUtil::FromBigEndian(data[idx]);
+      tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#else
+      tmp0 = BitUtil::FromLittleEndian(data[idx]);
+      tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const DayTimeIntervalType& type) {
+    auto data = reinterpret_cast<const uint32_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint32_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      new_data[idx] = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromBigEndian(data[idx + 1]);
+#else
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromLittleEndian(data[idx + 1]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const NullType& type) { return Status::OK(); }
+  Status Visit(const BooleanType& type) { return Status::OK(); }
+  Status Visit(const Int8Type& type) { return Status::OK(); }
+  Status Visit(const UInt8Type& type) { return Status::OK(); }
+  Status Visit(const FixedSizeBinaryType& type) { return Status::OK(); }
+  Status Visit(const FixedSizeListType& type) { return Status::OK(); }
+  Status Visit(const StructType& type) { return Status::OK(); }
+  Status Visit(const SparseUnionType& type) { return Status::OK(); }
+
+  Status Visit(const StringType& type) {
+    RETURN_NOT_OK(SwapSmallOffset());
+    return Status::OK();
+  }
+  Status Visit(const LargeStringType& type) {

Review comment:
       Covered by LargeBinaryType

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -84,6 +283,12 @@ std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {
   return out;
 }
 
+void SwapEndianArrayData(std::shared_ptr<ArrayData>& data) {
+  internal::ArrayDataEndianSwapper swapper_visitor(data, data->length);
+  DCHECK_OK(VisitTypeInline(*data->type, &swapper_visitor));
+  DCHECK_OK(swapper_visitor.SwapChildren((*data->type).fields()));

Review comment:
       Possibly better to inline the SwapChildren implementation here

##########
File path: cpp/src/arrow/array/util.cc
##########
@@ -74,6 +75,204 @@ class ArrayDataWrapper {
   std::shared_ptr<Array>* out_;
 };
 
+class ArrayDataEndianSwapper {
+ public:
+  ArrayDataEndianSwapper(std::shared_ptr<ArrayData>& data, int64_t length)
+      : data_(data), length_(length) {}
+
+  Status SwapType(const DataType& type) {
+    RETURN_NOT_OK(VisitTypeInline(type, this));
+    RETURN_NOT_OK(SwapChildren(type.fields()));
+    return Status::OK();
+  }
+
+  Status SwapChildren(std::vector<std::shared_ptr<Field>> child_fields) {
+    int i = 0;
+    for (const auto& child_field : child_fields) {
+      ArrayDataEndianSwapper swapper_child_visitor(data_->child_data[i],
+                                                   data_->child_data[i]->length);
+      RETURN_NOT_OK(VisitTypeInline(*child_field.get()->type(), &swapper_child_visitor));
+      RETURN_NOT_OK(
+          swapper_child_visitor.SwapChildren((*child_field.get()->type()).fields()));
+      i++;
+    }
+    return Status::OK();
+  }
+
+  template <typename VALUE_TYPE>
+  Status SwapOffset(int index) {
+    if (data_->buffers[index] == nullptr) {
+      return Status::OK();
+    }
+    auto data = reinterpret_cast<const VALUE_TYPE*>(data_->buffers[index]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer,
+                          AllocateBuffer(data_->buffers[index]->size() + 1));
+    auto new_data = reinterpret_cast<VALUE_TYPE*>(new_buffer->mutable_data());
+    // offset has one more element rather than data->length
+    int64_t length = length_ + 1;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[index] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status SwapSmallOffset(int index = 1) { return SwapOffset<int32_t>(index); }
+
+  Status SwapLargeOffset() { return SwapOffset<int64_t>(1); }
+
+  template <typename T>
+  Status Visit(const T&) {
+    using value_type = typename T::c_type;
+    auto data = reinterpret_cast<const value_type*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<value_type*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+#if ARROW_LITTLE_ENDIAN
+      new_data[i] = BitUtil::FromBigEndian(data[i]);
+#else
+      new_data[i] = BitUtil::FromLittleEndian(data[i]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal128Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp;
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      tmp = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#else
+      tmp = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 1]);
+      new_data[idx + 1] = tmp;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const Decimal256Type& type) {
+    auto data = reinterpret_cast<const uint64_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint64_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      uint64_t tmp0, tmp1, tmp2;
+      auto idx = i * 4;
+#if ARROW_LITTLE_ENDIAN
+      tmp0 = BitUtil::FromBigEndian(data[idx]);
+      tmp1 = BitUtil::FromBigEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromBigEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromBigEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#else
+      tmp0 = BitUtil::FromLittleEndian(data[idx]);
+      tmp1 = BitUtil::FromLittleEndian(data[idx + 1]);
+      tmp2 = BitUtil::FromLittleEndian(data[idx + 2]);
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx + 3]);
+      new_data[idx + 1] = tmp2;
+      new_data[idx + 2] = tmp1;
+      new_data[idx + 3] = tmp0;
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }
+
+  Status Visit(const DayTimeIntervalType& type) {
+    auto data = reinterpret_cast<const uint32_t*>(data_->buffers[1]->data());
+    ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateBuffer(data_->buffers[1]->size()));
+    auto new_data = reinterpret_cast<uint32_t*>(new_buffer->mutable_data());
+    int64_t length = length_;
+    for (int64_t i = 0; i < length; i++) {
+      auto idx = i * 2;
+#if ARROW_LITTLE_ENDIAN
+      new_data[idx] = BitUtil::FromBigEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromBigEndian(data[idx + 1]);
+#else
+      new_data[idx] = BitUtil::FromLittleEndian(data[idx]);
+      new_data[idx + 1] = BitUtil::FromLittleEndian(data[idx + 1]);
+#endif
+    }
+    data_->buffers[1] = std::move(new_buffer);
+    return Status::OK();
+  }

Review comment:
       Wouldn't this be per above `ByteSwapBuffer<uint32_t>(*data_->buffers[1], length_ * 2, ...)`?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -664,14 +690,15 @@ Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
   std::shared_ptr<Schema> out_schema;
   // Empty means do not use
   std::vector<bool> inclusion_mask;
-  RETURN_NOT_OK(GetInclusionMaskAndOutSchema(schema, options.included_fields,
+  DictionaryKind kind;

Review comment:
       kind is never used? See above comment re: this parameter

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -481,24 +500,30 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
     filtered_columns = std::move(columns);
   }
   if (compression != Compression::UNCOMPRESSED) {
-    RETURN_NOT_OK(DecompressBuffers(compression, options, &filtered_columns));
+    RETURN_NOT_OK(DecompressBuffers(compression, context.options, &filtered_columns));
   }
 
+  // swap endian in a set of ArrayData if necessary (swap_endian == true)
+  if (context.swap_endian) {
+    for (int i = 0; i < static_cast<int>(filtered_columns.size()); ++i) {
+      SwapEndianArrayData(filtered_columns[i]);
+    }
+  }
   return RecordBatch::Make(filtered_schema, metadata->length(),
                            std::move(filtered_columns));
 }
 
 Result<std::shared_ptr<RecordBatch>> LoadRecordBatch(
     const flatbuf::RecordBatch* metadata, const std::shared_ptr<Schema>& schema,
-    const std::vector<bool>& inclusion_mask, const DictionaryMemo* dictionary_memo,
-    const IpcReadOptions& options, MetadataVersion metadata_version,
-    Compression::type compression, io::RandomAccessFile* file) {
+    const std::vector<bool>& inclusion_mask, const IpcReadContext& context,
+    MetadataVersion metadata_version, Compression::type compression,
+    io::RandomAccessFile* file) {
   if (inclusion_mask.size() > 0) {
-    return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, dictionary_memo,
-                                 options, metadata_version, compression, file);
-  } else {
-    return LoadRecordBatchSubset(metadata, schema, nullptr, dictionary_memo, options,
+    return LoadRecordBatchSubset(metadata, schema, &inclusion_mask, context,
                                  metadata_version, compression, file);
+  } else {
+    return LoadRecordBatchSubset(metadata, schema, nullptr, context, metadata_version,

Review comment:
       nit, add `/*param_name=*/` for this nullptr

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -699,42 +726,45 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
 
   // Look up the dictionary value type, which must have been added to the
   // DictionaryMemo already prior to invoking this function
-  ARROW_ASSIGN_OR_RAISE(auto value_type, dictionary_memo->GetDictionaryType(id));
+  ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
 
   // Load the dictionary data from the dictionary batch
   ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
-                     options, file);
-  const auto dict_data = std::make_shared<ArrayData>();
+                     context.options, file);
+  auto dict_data = std::make_shared<ArrayData>();
   const Field dummy_field("", value_type);
   RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
 
   if (compression != Compression::UNCOMPRESSED) {
     ArrayDataVector dict_fields{dict_data};
-    RETURN_NOT_OK(DecompressBuffers(compression, options, &dict_fields));
+    RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields));
+  }
+
+  // swap endian in dict_data if necessary (swap_endian == true)
+  if (context.swap_endian) {
+    SwapEndianArrayData(dict_data);
   }
 
   if (dictionary_batch->isDelta()) {
-    if (kind != nullptr) {
-      *kind = DictionaryKind::Delta;
+    if (context.kind != nullptr) {
+      *context.kind = DictionaryKind::Delta;
     }
-    return dictionary_memo->AddDictionaryDelta(id, dict_data);
+    return context.dictionary_memo->AddDictionaryDelta(id, dict_data);
   }
   ARROW_ASSIGN_OR_RAISE(bool inserted,
-                        dictionary_memo->AddOrReplaceDictionary(id, dict_data));
-  if (kind != nullptr) {
-    *kind = inserted ? DictionaryKind::New : DictionaryKind::Replacement;
+                        context.dictionary_memo->AddOrReplaceDictionary(id, dict_data));
+  if (context.kind != nullptr) {
+    *context.kind = inserted ? DictionaryKind::New : DictionaryKind::Replacement;
   }

Review comment:
       I think it's better to leave the DictionaryKind as an out param

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -107,6 +108,23 @@ Status InvalidMessageType(MessageType expected, MessageType actual) {
 // ----------------------------------------------------------------------
 // Record batch read path
 
+/// \brief Structure to keep common arguments to be passed
+struct IpcReadContext {
+  IpcReadContext(DictionaryMemo* memo, const IpcReadOptions& option, DictionaryKind* kind,
+                 bool swap)
+      : dictionary_memo(memo), options(option), kind(kind), swap_endian(swap) {}
+
+  DictionaryMemo* dictionary_memo;
+
+  const IpcReadOptions& options;
+
+  DictionaryKind* kind;

Review comment:
       Having a mutable pointer here for an enum seems odd, can you explain?

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -699,42 +726,45 @@ Status ReadDictionary(const Buffer& metadata, DictionaryMemo* dictionary_memo,
 
   // Look up the dictionary value type, which must have been added to the
   // DictionaryMemo already prior to invoking this function
-  ARROW_ASSIGN_OR_RAISE(auto value_type, dictionary_memo->GetDictionaryType(id));
+  ARROW_ASSIGN_OR_RAISE(auto value_type, context.dictionary_memo->GetDictionaryType(id));
 
   // Load the dictionary data from the dictionary batch
   ArrayLoader loader(batch_meta, internal::GetMetadataVersion(message->version()),
-                     options, file);
-  const auto dict_data = std::make_shared<ArrayData>();
+                     context.options, file);
+  auto dict_data = std::make_shared<ArrayData>();
   const Field dummy_field("", value_type);
   RETURN_NOT_OK(loader.Load(&dummy_field, dict_data.get()));
 
   if (compression != Compression::UNCOMPRESSED) {
     ArrayDataVector dict_fields{dict_data};
-    RETURN_NOT_OK(DecompressBuffers(compression, options, &dict_fields));
+    RETURN_NOT_OK(DecompressBuffers(compression, context.options, &dict_fields));
+  }
+
+  // swap endian in dict_data if necessary (swap_endian == true)
+  if (context.swap_endian) {
+    SwapEndianArrayData(dict_data);

Review comment:
       Shouldn't this be able to fail (e.g. for a failed allocation)? I think it would be better to do
   
   ```
   ARROW_ASSIGN_OR_RAISE(dict_data, SwapEndianness(*dict_data, ...));
   ```

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -753,8 +783,15 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
       return Status::Invalid("Tried reading schema message, was null or length 0");
     }
 
-    return UnpackSchemaMessage(*message, options, &dictionary_memo_, &schema_,
-                               &out_schema_, &field_inclusion_mask_);
+    RETURN_NOT_OK(UnpackSchemaMessage(*message, options, &dictionary_memo_, &schema_,
+                                      &out_schema_, &field_inclusion_mask_));
+    swap_endian_ = options.ensure_native_endian && !out_schema_->IsNativeEndianness();
+    if (swap_endian_) {
+      // create a new schema with native endianness before swapping endian in ArrayData
+      schema_ = schema_->WithNativeEndianness();
+      out_schema_ = out_schema_->WithNativeEndianness();
+    }

Review comment:
       This code block occurs multiple times, is there a way to avoid that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org