You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/06/09 13:03:32 UTC

[arrow-nanoarrow] branch main updated: feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)

This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d9efd8  feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)
2d9efd8 is described below

commit 2d9efd877216d5c7280828bcf81b9338baf31ca9
Author: Dewey Dunnington <de...@dunnington.ca>
AuthorDate: Fri Jun 9 09:03:26 2023 -0400

    feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)
    
    Ensures IPC reader compatibility reading little endian streams from big
    endian (99% for the purposes of not scaring away potential dependencies
    that run tests on big endian). It's also not that hard to do since
    flatcc did the hard work of defining `bswap16()`, `bswap32()`, and
    `bswap64()` based on compiler defines.
---
 .../src/nanoarrow/nanoarrow_ipc_decoder.c          | 129 ++++++++++++++-
 .../src/nanoarrow/nanoarrow_ipc_decoder_test.cc    | 177 +++++++++++++++++++--
 .../src/nanoarrow/nanoarrow_ipc_files_test.cc      |  20 ++-
 .../src/nanoarrow/nanoarrow_ipc_reader_test.cc     |  17 --
 4 files changed, 304 insertions(+), 39 deletions(-)

diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index 571dd79..a38398f 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -1221,6 +1221,8 @@ struct ArrowIpcBufferSource {
   int64_t body_offset_bytes;
   int64_t buffer_length_bytes;
   enum ArrowIpcCompressionType codec;
+  enum ArrowType data_type;
+  int32_t element_size_bits;
   int swap_endian;
 };
 
@@ -1297,6 +1299,118 @@ static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
   return out;
 }
 
+// Just for the purposes of endian-swapping
+struct ArrowIpcIntervalMonthDayNano {
+  uint32_t months;
+  uint32_t days;
+  uint64_t ns;
+};
+
+static int ArrowIpcDecoderSwapEndian(struct ArrowIpcBufferSource* src,
+                                     struct ArrowBufferView* out_view,
+                                     struct ArrowBuffer* dst, struct ArrowError* error) {
+  // Some buffer data types don't need any endian swapping
+  switch (src->data_type) {
+    case NANOARROW_TYPE_BOOL:
+    case NANOARROW_TYPE_INT8:
+    case NANOARROW_TYPE_UINT8:
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_BINARY:
+      return NANOARROW_OK;
+    default:
+      break;
+  }
+
+  // Make sure dst is not a shared buffer that we can't modify
+  struct ArrowBuffer tmp;
+  ArrowBufferInit(&tmp);
+
+  if (dst->allocator.private_data != NULL) {
+    ArrowBufferMove(dst, &tmp);
+    ArrowBufferInit(dst);
+  }
+
+  if (dst->size_bytes == 0) {
+    NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(dst, out_view->size_bytes));
+    dst->size_bytes = out_view->size_bytes;
+  }
+
+  switch (src->data_type) {
+    case NANOARROW_TYPE_DECIMAL128:
+    case NANOARROW_TYPE_DECIMAL256: {
+      const uint64_t* ptr_src = out_view->data.as_uint64;
+      uint64_t* ptr_dst = (uint64_t*)dst->data;
+      uint64_t words[4];
+      int n_words = src->element_size_bits / 64;
+
+      for (int64_t i = 0; i < (dst->size_bytes / n_words / 8); i++) {
+        for (int j = 0; j < n_words; j++) {
+          words[j] = bswap64(ptr_src[i * n_words + j]);
+        }
+
+        for (int j = 0; j < n_words; j++) {
+          ptr_dst[i * n_words + j] = words[n_words - j - 1];
+        }
+      }
+      break;
+    }
+    case NANOARROW_TYPE_INTERVAL_DAY_TIME: {
+      uint32_t* ptr = (uint32_t*)dst->data;
+      for (int64_t i = 0; i < (dst->size_bytes / 4); i++) {
+        ptr[i] = bswap32(out_view->data.as_uint32[i]);
+      }
+      break;
+    }
+    case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: {
+      const uint8_t* ptr_src = out_view->data.as_uint8;
+      uint8_t* ptr_dst = dst->data;
+      int item_size_bytes = 16;
+      struct ArrowIpcIntervalMonthDayNano item;
+      for (int64_t i = 0; i < (dst->size_bytes / item_size_bytes); i++) {
+        memcpy(&item, ptr_src + i * item_size_bytes, item_size_bytes);
+        item.months = bswap32(item.months);
+        item.days = bswap32(item.days);
+        item.ns = bswap64(item.ns);
+        memcpy(ptr_dst + i * item_size_bytes, &item, item_size_bytes);
+      }
+      break;
+    }
+    default:
+      switch (src->element_size_bits) {
+        case 16: {
+          uint16_t* ptr = (uint16_t*)dst->data;
+          for (int64_t i = 0; i < (dst->size_bytes / 2); i++) {
+            ptr[i] = bswap16(out_view->data.as_uint16[i]);
+          }
+          break;
+        }
+        case 32: {
+          uint32_t* ptr = (uint32_t*)dst->data;
+          for (int64_t i = 0; i < (dst->size_bytes / 4); i++) {
+            ptr[i] = bswap32(out_view->data.as_uint32[i]);
+          }
+          break;
+        }
+        case 64: {
+          uint64_t* ptr = (uint64_t*)dst->data;
+          for (int64_t i = 0; i < (dst->size_bytes / 8); i++) {
+            ptr[i] = bswap64(out_view->data.as_uint64[i]);
+          }
+          break;
+        }
+        default:
+          ArrowErrorSet(error, "Endian swapping for element bitwidth %d is not supported",
+                        (int)src->element_size_bits);
+          return ENOTSUP;
+      }
+      break;
+  }
+
+  ArrowBufferReset(&tmp);
+  out_view->data.data = dst->data;
+  return NANOARROW_OK;
+}
+
 struct ArrowIpcArraySetter {
   ns(FieldNode_vec_t) fields;
   int64_t field_i;
@@ -1334,16 +1448,16 @@ static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t
     return ENOTSUP;
   }
 
-  if (setter->src.swap_endian) {
-    ArrowErrorSet(error,
-                  "The nanoarrow_ipc extension does not support non-system endianness");
-    return ENOTSUP;
-  }
-
   setter->src.body_offset_bytes = offset;
   setter->src.buffer_length_bytes = length;
   NANOARROW_RETURN_NOT_OK(
       setter->factory.make_buffer(&setter->factory, &setter->src, out_view, out, error));
+
+  if (setter->src.swap_endian) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcDecoderSwapEndian(&setter->src, out_view, out, error));
+  }
+
   return NANOARROW_OK;
 }
 
@@ -1412,6 +1526,9 @@ static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcArraySetter* setter,
       buffer_dst->size_bytes = 0;
     }
 
+    setter->src.data_type = array_view->layout.buffer_data_type[i];
+    setter->src.element_size_bits = array_view->layout.element_size_bits[i];
+
     NANOARROW_RETURN_NOT_OK(
         ArrowIpcDecoderMakeBuffer(setter, buffer_offset, buffer_length,
                                   &array_view->buffer_views[i], buffer_dst, error));
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
index 62e4bb0..3f8653d 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
@@ -330,19 +330,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
   EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support compression");
   decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
 
-  // Field extract should fail on non-system endian
-  if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_LITTLE) {
-    ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_BIG);
-  } else {
-    ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_LITTLE);
-  }
-  EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
-                                       NANOARROW_VALIDATION_LEVEL_FULL, &error),
-            ENOTSUP);
-  EXPECT_STREQ(error.message,
-               "The nanoarrow_ipc extension does not support non-system endianness");
-  ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_UNINITIALIZED);
-
   // Field extract should fail if body is too small
   decoder.body_size_bytes = 0;
   EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
@@ -525,7 +512,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) {
 
 TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) {
   if (!ArrowIpcSharedBufferIsThreadSafe()) {
-    GTEST_SKIP();
+    GTEST_SKIP() << "ArrowIpcSharedBufferIsThreadSafe() returned false";
   }
 
   struct ArrowIpcDecoder decoder;
@@ -748,3 +735,165 @@ INSTANTIATE_TEST_SUITE_P(
         arrow::schema({}, arrow::KeyValueMetadata::Make({"key1"}, {"value1"})),
         // Non-nullable field
         arrow::schema({arrow::field("some_name", arrow::int32(), false)})));
+
+class ArrowTypeIdParameterizedTestFixture
+    : public ::testing::TestWithParam<enum ArrowType> {
+ protected:
+  enum ArrowType data_type;
+};
+
+TEST_P(ArrowTypeIdParameterizedTestFixture, NanoarrowIpcDecodeSwapEndian) {
+  enum ArrowType data_type = GetParam();
+  int64_t n_elements_test = 10;
+
+  // Make a data buffer long enough for 10 Decimal256s with a pattern
+  // where an endian swap isn't silently the same value (e.g., 0s)
+  uint8_t data_buffer[32 * 10];
+  for (int64_t i = 0; i < sizeof(data_buffer); i++) {
+    data_buffer[i] = i % 256;
+  }
+
+  int bit_width;
+  std::shared_ptr<arrow::DataType> arrow_data_type;
+  switch (data_type) {
+    case NANOARROW_TYPE_BOOL:
+      bit_width = 1;
+      arrow_data_type = arrow::boolean();
+      break;
+    case NANOARROW_TYPE_INT8:
+      bit_width = 8;
+      arrow_data_type = arrow::int8();
+      break;
+    case NANOARROW_TYPE_INT16:
+      bit_width = 16;
+      arrow_data_type = arrow::int16();
+      break;
+    case NANOARROW_TYPE_INT32:
+      bit_width = 32;
+      arrow_data_type = arrow::int32();
+      break;
+    case NANOARROW_TYPE_INT64:
+      bit_width = 64;
+      arrow_data_type = arrow::int64();
+      break;
+    case NANOARROW_TYPE_DECIMAL128:
+      arrow_data_type = arrow::decimal128(10, 3);
+      bit_width = 128;
+      break;
+    case NANOARROW_TYPE_DECIMAL256:
+      bit_width = 256;
+      arrow_data_type = arrow::decimal256(10, 3);
+      break;
+    case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+      bit_width = 128;
+      arrow_data_type = arrow::month_day_nano_interval();
+      break;
+    default:
+      GTEST_FAIL() << "Type not supported for test";
+  }
+
+  // "Manually" swap the endians
+  uint8_t data_buffer_swapped[32 * 10];
+  int64_t n_elements = sizeof(data_buffer) * 8 / bit_width;
+  if (bit_width > 8 && data_type != NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO) {
+    int byte_width = bit_width / 8;
+    for (int64_t i = 0; i < n_elements; i++) {
+      uint8_t* src = data_buffer + (i * byte_width);
+      uint8_t* dst = data_buffer_swapped + (i * byte_width);
+      for (int j = 0; j < byte_width; j++) {
+        dst[j] = src[byte_width - j - 1];
+      }
+    }
+  } else if (data_type == NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO) {
+    for (int64_t i = 0; i < n_elements; i++) {
+      uint8_t* src = data_buffer + (i * 16);
+      uint8_t* dst = data_buffer_swapped + (i * 16);
+
+      for (int j = 0; j < 4; j++) {
+        dst[j] = src[4 - j - 1];
+      }
+      src += 4;
+      dst += 4;
+
+      for (int j = 0; j < 4; j++) {
+        dst[j] = src[4 - j - 1];
+      }
+      src += 4;
+      dst += 4;
+
+      for (int j = 0; j < 8; j++) {
+        dst[j] = src[8 - j - 1];
+      }
+    }
+  } else {
+    memcpy(data_buffer_swapped, data_buffer, sizeof(data_buffer));
+  }
+
+  // Make an array wrapping the swapped buffer
+  auto empty = std::make_shared<arrow::Buffer>(nullptr, 0);
+  auto buffer =
+      std::make_shared<arrow::Buffer>(data_buffer_swapped, sizeof(data_buffer_swapped));
+  arrow::BufferVector buffers = {empty, buffer};
+  auto array_data =
+      std::make_shared<arrow::ArrayData>(arrow_data_type, n_elements_test, buffers, 0, 0);
+  auto array = arrow::MakeArray(array_data);
+
+  // Make a RecordBatch
+  auto arrow_schema = arrow::schema({arrow::field("col1", arrow_data_type)});
+  auto arrow_record_batch =
+      arrow::RecordBatch::Make(arrow_schema, n_elements_test, {array});
+
+  // Serialize it
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+  auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*arrow_record_batch, options);
+  if (!maybe_serialized.ok()) {
+    GTEST_FAIL() << maybe_serialized.status();
+  }
+  auto serialized = *maybe_serialized;
+
+  struct ArrowSchema schema;
+  if (!arrow::ExportSchema(*arrow_schema, &schema).ok()) {
+    GTEST_FAIL() << "schema export failed";
+  }
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = serialized->data();
+  data.size_bytes = serialized->size();
+
+  struct ArrowIpcDecoder decoder;
+  ArrowIpcDecoderInit(&decoder);
+  ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), NANOARROW_OK);
+
+#ifdef __BIG_ENDIAN__
+  ASSERT_EQ(ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_LITTLE),
+            NANOARROW_OK);
+#else
+  ASSERT_EQ(ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_BIG),
+            NANOARROW_OK);
+#endif
+
+  ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, nullptr), NANOARROW_OK);
+  data.data.as_uint8 += decoder.header_size_bytes;
+  data.size_bytes -= decoder.header_size_bytes;
+
+  struct ArrowArrayView* array_view;
+  ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(&decoder, data, 0, &array_view, nullptr),
+            NANOARROW_OK);
+  ASSERT_EQ(array_view->storage_type, data_type);
+
+  // Check buffer equality with our initial buffer
+  EXPECT_EQ(memcmp(array_view->buffer_views[1].data.data, data_buffer,
+                   array_view->buffer_views[1].size_bytes),
+            0);
+
+  schema.release(&schema);
+  ArrowIpcDecoderReset(&decoder);
+}
+
+INSTANTIATE_TEST_SUITE_P(NanoarrowIpcTest, ArrowTypeIdParameterizedTestFixture,
+                         ::testing::Values(NANOARROW_TYPE_BOOL, NANOARROW_TYPE_INT8,
+                                           NANOARROW_TYPE_INT16, NANOARROW_TYPE_INT32,
+                                           NANOARROW_TYPE_INT64,
+                                           NANOARROW_TYPE_DECIMAL128,
+                                           NANOARROW_TYPE_DECIMAL256,
+                                           NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO));
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
index 6c6081d..3db8f46 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
@@ -152,8 +152,7 @@ class TestFile {
         io::RandomAccessFile::GetStream(buffer_reader, 0, content_copy_wrapped->size());
 #endif
 
-    auto maybe_reader =
-        ipc::RecordBatchStreamReader::Open(input_stream);
+    auto maybe_reader = ipc::RecordBatchStreamReader::Open(input_stream);
     if (!maybe_reader.ok()) {
       GTEST_FAIL() << maybe_reader.status().message();
     }
@@ -229,6 +228,23 @@ TEST_P(ArrowTestingPathParameterizedTestFixture, NanoarrowIpcTestFileNativeEndia
   param.Test(dir_builder.str());
 }
 
+TEST_P(ArrowTestingPathParameterizedTestFixture, NanoarrowIpcTestFileSwapEndian) {
+  const char* testing_dir = getenv("NANOARROW_ARROW_TESTING_DIR");
+  if (testing_dir == nullptr || strlen(testing_dir) == 0) {
+    GTEST_SKIP() << "NANOARROW_ARROW_TESTING_DIR environment variable not set";
+  }
+
+  std::stringstream dir_builder;
+
+#if defined(__BIG_ENDIAN__)
+  dir_builder << testing_dir << "/data/arrow-ipc-stream/integration/1.0.0-littleendian";
+#else
+  dir_builder << testing_dir << "/data/arrow-ipc-stream/integration/1.0.0-bigendian";
+#endif
+  TestFile param = GetParam();
+  param.Test(dir_builder.str());
+}
+
 INSTANTIATE_TEST_SUITE_P(
     NanoarrowIpcTest, ArrowTestingPathParameterizedTestFixture,
     ::testing::Values(
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
index 231ff0c..f3e4a5a 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -21,8 +21,6 @@
 
 #include "nanoarrow_ipc.h"
 
-#include "flatcc/portable/pendian_detect.h"
-
 static uint8_t kSimpleSchema[] = {
     0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
     0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
@@ -163,14 +161,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) {
   schema.release(&schema);
 
   struct ArrowArray array;
-  // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.length, 3);
   array.release(&array);
-#else
-  ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
 
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.release, nullptr);
@@ -205,14 +198,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasicNoSharedBuffers) {
   schema.release(&schema);
 
   struct ArrowArray array;
-  // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.length, 3);
   array.release(&array);
-#else
-  ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
 
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.release, nullptr);
@@ -246,14 +234,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
   schema.release(&schema);
 
   struct ArrowArray array;
-  // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.length, 3);
   array.release(&array);
-#else
-  ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
 
   ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
   EXPECT_EQ(array.release, nullptr);