You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/06/09 13:03:32 UTC
[arrow-nanoarrow] branch main updated: feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)
This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 2d9efd8 feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)
2d9efd8 is described below
commit 2d9efd877216d5c7280828bcf81b9338baf31ca9
Author: Dewey Dunnington <de...@dunnington.ca>
AuthorDate: Fri Jun 9 09:03:26 2023 -0400
feat(extensions/nanoarrow_ipc): Add endian swapping to IPC reader (#214)
Ensures IPC reader compatibility reading little endian streams from big
endian (99% for the purposes of not scaring away potential dependencies
that run tests on big endian). It's also not that hard to do since
flatcc did the hard work of defining `bswap16()`, `bswap32()`, and
`bswap64()` based on compiler defines.
---
.../src/nanoarrow/nanoarrow_ipc_decoder.c | 129 ++++++++++++++-
.../src/nanoarrow/nanoarrow_ipc_decoder_test.cc | 177 +++++++++++++++++++--
.../src/nanoarrow/nanoarrow_ipc_files_test.cc | 20 ++-
.../src/nanoarrow/nanoarrow_ipc_reader_test.cc | 17 --
4 files changed, 304 insertions(+), 39 deletions(-)
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
index 571dd79..a38398f 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
@@ -1221,6 +1221,8 @@ struct ArrowIpcBufferSource {
int64_t body_offset_bytes;
int64_t buffer_length_bytes;
enum ArrowIpcCompressionType codec;
+ enum ArrowType data_type;
+ int32_t element_size_bits;
int swap_endian;
};
@@ -1297,6 +1299,118 @@ static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared(
return out;
}
+// Just for the purposes of endian-swapping
+struct ArrowIpcIntervalMonthDayNano {
+ uint32_t months;
+ uint32_t days;
+ uint64_t ns;
+};
+
+static int ArrowIpcDecoderSwapEndian(struct ArrowIpcBufferSource* src,
+ struct ArrowBufferView* out_view,
+ struct ArrowBuffer* dst, struct ArrowError* error) {
+ // Some buffer data types don't need any endian swapping
+ switch (src->data_type) {
+ case NANOARROW_TYPE_BOOL:
+ case NANOARROW_TYPE_INT8:
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY:
+ return NANOARROW_OK;
+ default:
+ break;
+ }
+
+ // Make sure dst is not a shared buffer that we can't modify
+ struct ArrowBuffer tmp;
+ ArrowBufferInit(&tmp);
+
+ if (dst->allocator.private_data != NULL) {
+ ArrowBufferMove(dst, &tmp);
+ ArrowBufferInit(dst);
+ }
+
+ if (dst->size_bytes == 0) {
+ NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(dst, out_view->size_bytes));
+ dst->size_bytes = out_view->size_bytes;
+ }
+
+ switch (src->data_type) {
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256: {
+ const uint64_t* ptr_src = out_view->data.as_uint64;
+ uint64_t* ptr_dst = (uint64_t*)dst->data;
+ uint64_t words[4];
+ int n_words = src->element_size_bits / 64;
+
+ for (int64_t i = 0; i < (dst->size_bytes / n_words / 8); i++) {
+ for (int j = 0; j < n_words; j++) {
+ words[j] = bswap64(ptr_src[i * n_words + j]);
+ }
+
+ for (int j = 0; j < n_words; j++) {
+ ptr_dst[i * n_words + j] = words[n_words - j - 1];
+ }
+ }
+ break;
+ }
+ case NANOARROW_TYPE_INTERVAL_DAY_TIME: {
+ uint32_t* ptr = (uint32_t*)dst->data;
+ for (int64_t i = 0; i < (dst->size_bytes / 4); i++) {
+ ptr[i] = bswap32(out_view->data.as_uint32[i]);
+ }
+ break;
+ }
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: {
+ const uint8_t* ptr_src = out_view->data.as_uint8;
+ uint8_t* ptr_dst = dst->data;
+ int item_size_bytes = 16;
+ struct ArrowIpcIntervalMonthDayNano item;
+ for (int64_t i = 0; i < (dst->size_bytes / item_size_bytes); i++) {
+ memcpy(&item, ptr_src + i * item_size_bytes, item_size_bytes);
+ item.months = bswap32(item.months);
+ item.days = bswap32(item.days);
+ item.ns = bswap64(item.ns);
+ memcpy(ptr_dst + i * item_size_bytes, &item, item_size_bytes);
+ }
+ break;
+ }
+ default:
+ switch (src->element_size_bits) {
+ case 16: {
+ uint16_t* ptr = (uint16_t*)dst->data;
+ for (int64_t i = 0; i < (dst->size_bytes / 2); i++) {
+ ptr[i] = bswap16(out_view->data.as_uint16[i]);
+ }
+ break;
+ }
+ case 32: {
+ uint32_t* ptr = (uint32_t*)dst->data;
+ for (int64_t i = 0; i < (dst->size_bytes / 4); i++) {
+ ptr[i] = bswap32(out_view->data.as_uint32[i]);
+ }
+ break;
+ }
+ case 64: {
+ uint64_t* ptr = (uint64_t*)dst->data;
+ for (int64_t i = 0; i < (dst->size_bytes / 8); i++) {
+ ptr[i] = bswap64(out_view->data.as_uint64[i]);
+ }
+ break;
+ }
+ default:
+ ArrowErrorSet(error, "Endian swapping for element bitwidth %d is not supported",
+ (int)src->element_size_bits);
+ return ENOTSUP;
+ }
+ break;
+ }
+
+ ArrowBufferReset(&tmp);
+ out_view->data.data = dst->data;
+ return NANOARROW_OK;
+}
+
struct ArrowIpcArraySetter {
ns(FieldNode_vec_t) fields;
int64_t field_i;
@@ -1334,16 +1448,16 @@ static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t
return ENOTSUP;
}
- if (setter->src.swap_endian) {
- ArrowErrorSet(error,
- "The nanoarrow_ipc extension does not support non-system endianness");
- return ENOTSUP;
- }
-
setter->src.body_offset_bytes = offset;
setter->src.buffer_length_bytes = length;
NANOARROW_RETURN_NOT_OK(
setter->factory.make_buffer(&setter->factory, &setter->src, out_view, out, error));
+
+ if (setter->src.swap_endian) {
+ NANOARROW_RETURN_NOT_OK(
+ ArrowIpcDecoderSwapEndian(&setter->src, out_view, out, error));
+ }
+
return NANOARROW_OK;
}
@@ -1412,6 +1526,9 @@ static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcArraySetter* setter,
buffer_dst->size_bytes = 0;
}
+ setter->src.data_type = array_view->layout.buffer_data_type[i];
+ setter->src.element_size_bits = array_view->layout.element_size_bits[i];
+
NANOARROW_RETURN_NOT_OK(
ArrowIpcDecoderMakeBuffer(setter, buffer_offset, buffer_length,
&array_view->buffer_views[i], buffer_dst, error));
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
index 62e4bb0..3f8653d 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder_test.cc
@@ -330,19 +330,6 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
EXPECT_STREQ(error.message, "The nanoarrow_ipc extension does not support compression");
decoder.codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE;
- // Field extract should fail on non-system endian
- if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_LITTLE) {
- ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_BIG);
- } else {
- ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_LITTLE);
- }
- EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
- NANOARROW_VALIDATION_LEVEL_FULL, &error),
- ENOTSUP);
- EXPECT_STREQ(error.message,
- "The nanoarrow_ipc extension does not support non-system endianness");
- ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_UNINITIALIZED);
-
// Field extract should fail if body is too small
decoder.body_size_bytes = 0;
EXPECT_EQ(ArrowIpcDecoderDecodeArray(&decoder, body, 0, &array,
@@ -525,7 +512,7 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatchFromShared) {
TEST(NanoarrowIpcTest, NanoarrowIpcSharedBufferThreadSafeDecode) {
if (!ArrowIpcSharedBufferIsThreadSafe()) {
- GTEST_SKIP();
+ GTEST_SKIP() << "ArrowIpcSharedBufferIsThreadSafe() returned false";
}
struct ArrowIpcDecoder decoder;
@@ -748,3 +735,165 @@ INSTANTIATE_TEST_SUITE_P(
arrow::schema({}, arrow::KeyValueMetadata::Make({"key1"}, {"value1"})),
// Non-nullable field
arrow::schema({arrow::field("some_name", arrow::int32(), false)})));
+
+class ArrowTypeIdParameterizedTestFixture
+ : public ::testing::TestWithParam<enum ArrowType> {
+ protected:
+ enum ArrowType data_type;
+};
+
+TEST_P(ArrowTypeIdParameterizedTestFixture, NanoarrowIpcDecodeSwapEndian) {
+ enum ArrowType data_type = GetParam();
+ int64_t n_elements_test = 10;
+
+ // Make a data buffer long enough for 10 Decimal256s with a pattern
+ // where an endian swap isn't silently the same value (e.g., 0s)
+ uint8_t data_buffer[32 * 10];
+ for (int64_t i = 0; i < sizeof(data_buffer); i++) {
+ data_buffer[i] = i % 256;
+ }
+
+ int bit_width;
+ std::shared_ptr<arrow::DataType> arrow_data_type;
+ switch (data_type) {
+ case NANOARROW_TYPE_BOOL:
+ bit_width = 1;
+ arrow_data_type = arrow::boolean();
+ break;
+ case NANOARROW_TYPE_INT8:
+ bit_width = 8;
+ arrow_data_type = arrow::int8();
+ break;
+ case NANOARROW_TYPE_INT16:
+ bit_width = 16;
+ arrow_data_type = arrow::int16();
+ break;
+ case NANOARROW_TYPE_INT32:
+ bit_width = 32;
+ arrow_data_type = arrow::int32();
+ break;
+ case NANOARROW_TYPE_INT64:
+ bit_width = 64;
+ arrow_data_type = arrow::int64();
+ break;
+ case NANOARROW_TYPE_DECIMAL128:
+ arrow_data_type = arrow::decimal128(10, 3);
+ bit_width = 128;
+ break;
+ case NANOARROW_TYPE_DECIMAL256:
+ bit_width = 256;
+ arrow_data_type = arrow::decimal256(10, 3);
+ break;
+ case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO:
+ bit_width = 128;
+ arrow_data_type = arrow::month_day_nano_interval();
+ break;
+ default:
+ GTEST_FAIL() << "Type not supported for test";
+ }
+
+ // "Manually" swap the endians
+ uint8_t data_buffer_swapped[32 * 10];
+ int64_t n_elements = sizeof(data_buffer) * 8 / bit_width;
+ if (bit_width > 8 && data_type != NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO) {
+ int byte_width = bit_width / 8;
+ for (int64_t i = 0; i < n_elements; i++) {
+ uint8_t* src = data_buffer + (i * byte_width);
+ uint8_t* dst = data_buffer_swapped + (i * byte_width);
+ for (int j = 0; j < byte_width; j++) {
+ dst[j] = src[byte_width - j - 1];
+ }
+ }
+ } else if (data_type == NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO) {
+ for (int64_t i = 0; i < n_elements; i++) {
+ uint8_t* src = data_buffer + (i * 16);
+ uint8_t* dst = data_buffer_swapped + (i * 16);
+
+ for (int j = 0; j < 4; j++) {
+ dst[j] = src[4 - j - 1];
+ }
+ src += 4;
+ dst += 4;
+
+ for (int j = 0; j < 4; j++) {
+ dst[j] = src[4 - j - 1];
+ }
+ src += 4;
+ dst += 4;
+
+ for (int j = 0; j < 8; j++) {
+ dst[j] = src[8 - j - 1];
+ }
+ }
+ } else {
+ memcpy(data_buffer_swapped, data_buffer, sizeof(data_buffer));
+ }
+
+ // Make an array wrapping the swapped buffer
+ auto empty = std::make_shared<arrow::Buffer>(nullptr, 0);
+ auto buffer =
+ std::make_shared<arrow::Buffer>(data_buffer_swapped, sizeof(data_buffer_swapped));
+ arrow::BufferVector buffers = {empty, buffer};
+ auto array_data =
+ std::make_shared<arrow::ArrayData>(arrow_data_type, n_elements_test, buffers, 0, 0);
+ auto array = arrow::MakeArray(array_data);
+
+ // Make a RecordBatch
+ auto arrow_schema = arrow::schema({arrow::field("col1", arrow_data_type)});
+ auto arrow_record_batch =
+ arrow::RecordBatch::Make(arrow_schema, n_elements_test, {array});
+
+ // Serialize it
+ auto options = arrow::ipc::IpcWriteOptions::Defaults();
+ auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*arrow_record_batch, options);
+ if (!maybe_serialized.ok()) {
+ GTEST_FAIL() << maybe_serialized.status();
+ }
+ auto serialized = *maybe_serialized;
+
+ struct ArrowSchema schema;
+ if (!arrow::ExportSchema(*arrow_schema, &schema).ok()) {
+ GTEST_FAIL() << "schema export failed";
+ }
+
+ struct ArrowBufferView data;
+ data.data.as_uint8 = serialized->data();
+ data.size_bytes = serialized->size();
+
+ struct ArrowIpcDecoder decoder;
+ ArrowIpcDecoderInit(&decoder);
+ ASSERT_EQ(ArrowIpcDecoderSetSchema(&decoder, &schema, nullptr), NANOARROW_OK);
+
+#ifdef __BIG_ENDIAN__
+ ASSERT_EQ(ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_LITTLE),
+ NANOARROW_OK);
+#else
+ ASSERT_EQ(ArrowIpcDecoderSetEndianness(&decoder, NANOARROW_IPC_ENDIANNESS_BIG),
+ NANOARROW_OK);
+#endif
+
+ ASSERT_EQ(ArrowIpcDecoderDecodeHeader(&decoder, data, nullptr), NANOARROW_OK);
+ data.data.as_uint8 += decoder.header_size_bytes;
+ data.size_bytes -= decoder.header_size_bytes;
+
+ struct ArrowArrayView* array_view;
+ ASSERT_EQ(ArrowIpcDecoderDecodeArrayView(&decoder, data, 0, &array_view, nullptr),
+ NANOARROW_OK);
+ ASSERT_EQ(array_view->storage_type, data_type);
+
+ // Check buffer equality with our initial buffer
+ EXPECT_EQ(memcmp(array_view->buffer_views[1].data.data, data_buffer,
+ array_view->buffer_views[1].size_bytes),
+ 0);
+
+ schema.release(&schema);
+ ArrowIpcDecoderReset(&decoder);
+}
+
+INSTANTIATE_TEST_SUITE_P(NanoarrowIpcTest, ArrowTypeIdParameterizedTestFixture,
+ ::testing::Values(NANOARROW_TYPE_BOOL, NANOARROW_TYPE_INT8,
+ NANOARROW_TYPE_INT16, NANOARROW_TYPE_INT32,
+ NANOARROW_TYPE_INT64,
+ NANOARROW_TYPE_DECIMAL128,
+ NANOARROW_TYPE_DECIMAL256,
+ NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO));
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
index 6c6081d..3db8f46 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_files_test.cc
@@ -152,8 +152,7 @@ class TestFile {
io::RandomAccessFile::GetStream(buffer_reader, 0, content_copy_wrapped->size());
#endif
- auto maybe_reader =
- ipc::RecordBatchStreamReader::Open(input_stream);
+ auto maybe_reader = ipc::RecordBatchStreamReader::Open(input_stream);
if (!maybe_reader.ok()) {
GTEST_FAIL() << maybe_reader.status().message();
}
@@ -229,6 +228,23 @@ TEST_P(ArrowTestingPathParameterizedTestFixture, NanoarrowIpcTestFileNativeEndia
param.Test(dir_builder.str());
}
+TEST_P(ArrowTestingPathParameterizedTestFixture, NanoarrowIpcTestFileSwapEndian) {
+ const char* testing_dir = getenv("NANOARROW_ARROW_TESTING_DIR");
+ if (testing_dir == nullptr || strlen(testing_dir) == 0) {
+ GTEST_SKIP() << "NANOARROW_ARROW_TESTING_DIR environment variable not set";
+ }
+
+ std::stringstream dir_builder;
+
+#if defined(__BIG_ENDIAN__)
+ dir_builder << testing_dir << "/data/arrow-ipc-stream/integration/1.0.0-littleendian";
+#else
+ dir_builder << testing_dir << "/data/arrow-ipc-stream/integration/1.0.0-bigendian";
+#endif
+ TestFile param = GetParam();
+ param.Test(dir_builder.str());
+}
+
INSTANTIATE_TEST_SUITE_P(
NanoarrowIpcTest, ArrowTestingPathParameterizedTestFixture,
::testing::Values(
diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
index 231ff0c..f3e4a5a 100644
--- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
+++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader_test.cc
@@ -21,8 +21,6 @@
#include "nanoarrow_ipc.h"
-#include "flatcc/portable/pendian_detect.h"
-
static uint8_t kSimpleSchema[] = {
0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,
0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00,
@@ -163,14 +161,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasic) {
schema.release(&schema);
struct ArrowArray array;
- // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.length, 3);
array.release(&array);
-#else
- ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.release, nullptr);
@@ -205,14 +198,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasicNoSharedBuffers) {
schema.release(&schema);
struct ArrowArray array;
- // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.length, 3);
array.release(&array);
-#else
- ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.release, nullptr);
@@ -246,14 +234,9 @@ TEST(NanoarrowIpcReader, StreamReaderBasicWithEndOfStream) {
schema.release(&schema);
struct ArrowArray array;
- // TODO: Support endian swapping (GH-171)
-#if !defined(__BIG_ENDIAN__)
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.length, 3);
array.release(&array);
-#else
- ASSERT_EQ(stream.get_next(&stream, &array), ENOTSUP);
-#endif
ASSERT_EQ(stream.get_next(&stream, &array), NANOARROW_OK);
EXPECT_EQ(array.release, nullptr);