You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "paleolimbot (via GitHub)" <gi...@apache.org> on 2023/03/02 17:44:35 UTC

[GitHub] [arrow-nanoarrow] paleolimbot opened a new pull request, #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

paleolimbot opened a new pull request, #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143

   Closes #91.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] paleolimbot merged pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot merged PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] lidavidm commented on a diff in pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129492452


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +876,261 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  private_data->last_message = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  struct ArrowIpcReaderPrivate* private_data =
+      (struct ArrowIpcReaderPrivate*)reader->private_data;
+
+  if (private_data->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&private_data->schema, out);

Review Comment:
   It does break expectations for Get though; usually you'd expect that to be idempotent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] lidavidm commented on a diff in pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129405480


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -80,8 +129,33 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
                                     struct ArrowBufferView data,
                                     struct ArrowError* error);
 
-#endif
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out, struct ArrowError* error);
+
+ArrowErrorCode ArrowIpcReaderGetArray(struct ArrowIpcReader* reader,
+                                      struct ArrowBufferView body, int64_t i,
+                                      struct ArrowArray* out, struct ArrowError* error);
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error);
+
+struct ArrowIpcField {
+  struct ArrowArrayView* array_view;
+  int64_t buffer_offset;
+};
+
+struct ArrowIpcReaderPrivate {

Review Comment:
   nit: maybe indicate that it's only here for testing?
   
   (alternatively, you could keep it in the .c file and copy-paste it to the .cc file in an 'extern C' block)



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +876,261 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  private_data->last_message = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  struct ArrowIpcReaderPrivate* private_data =
+      (struct ArrowIpcReaderPrivate*)reader->private_data;
+
+  if (private_data->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&private_data->schema, out);

Review Comment:
   Do we really want Move here, or a copy?



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -205,6 +386,74 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowTypeRoundtrip) {
   ArrowIpcReaderReset(&reader);
 }
 
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
+  const std::shared_ptr<arrow::DataType>& data_type = GetParam();
+  std::shared_ptr<arrow::Schema> dummy_schema =
+      arrow::schema({arrow::field("dummy_name", data_type)});
+
+  auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema);
+  ASSERT_TRUE(maybe_empty.ok());
+  auto empty = maybe_empty.ValueUnsafe();
+
+  auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3);
+  ASSERT_TRUE(maybe_nulls_array.ok());
+  auto nulls =
+      arrow::RecordBatch::Make(dummy_schema, 3, {maybe_nulls_array.ValueUnsafe()});
+
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+
+  struct ArrowSchema schema;
+  struct ArrowIpcReader reader;
+  struct ArrowBufferView buffer_view;
+  struct ArrowArray array;
+
+  // Initialize the reader
+  ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok());
+  ArrowIpcReaderInit(&reader);
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);
+
+  // Check the empty array
+  auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
+  buffer_view.data.as_uint8 += reader.header_size_bytes;
+  buffer_view.size_bytes -= reader.header_size_bytes;
+  ASSERT_EQ(ArrowIpcReaderGetArray(&reader, buffer_view, -1, &array, nullptr),
+            NANOARROW_OK);
+
+  auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty));
+
+  // Check the array with 3 null values
+  maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
+  buffer_view.data.as_uint8 += reader.header_size_bytes;
+  buffer_view.size_bytes -= reader.header_size_bytes;
+  ASSERT_EQ(ArrowIpcReaderGetArray(&reader, buffer_view, -1, &array, nullptr),
+            NANOARROW_OK);
+
+  maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), nulls->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));
+
+  if (!maybe_batch.ValueUnsafe()->Equals(*nulls)) {
+    std::cout << "something";

Review Comment:
   ping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] lidavidm commented on a diff in pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1124991112


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -50,19 +56,36 @@ enum ArrowIpcEndianness {
   NANOARROW_IPC_ENDIANNESS_BIG
 };
 
+enum ArrowIpcCompressionType {
+  NANOARROW_IPC_COMPRESSION_TYPE_NONE,
+  NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME,
+  NANOARROW_IPC_COMPRESSION_TYPE_ZSTD
+};
+
 #define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
 #define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
 
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
+struct ArrowIpcField {
+  struct ArrowArrayView* array_view;
+  int64_t buffer_offset;
+};
+
 struct ArrowIpcReader {
   int32_t metadata_version;
   int32_t message_type;
   int32_t endianness;
   int32_t features;
+  int32_t codec;
   int32_t header_size_bytes;
   int64_t body_size_bytes;
   struct ArrowSchema schema;
+  struct ArrowArrayView array_view;

Review Comment:
   Maybe it's time to document these fields? In particular, whether they're meant to be read-write, read-only, or write-only, too.



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.h:
##########
@@ -50,19 +56,36 @@ enum ArrowIpcEndianness {
   NANOARROW_IPC_ENDIANNESS_BIG
 };
 
+enum ArrowIpcCompressionType {
+  NANOARROW_IPC_COMPRESSION_TYPE_NONE,
+  NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME,
+  NANOARROW_IPC_COMPRESSION_TYPE_ZSTD
+};
+
 #define NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT 1
 #define NANOARROW_IPC_FEATURE_COMPRESSED_BODY 2
 
 ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error);
 
+struct ArrowIpcField {
+  struct ArrowArrayView* array_view;
+  int64_t buffer_offset;
+};
+
 struct ArrowIpcReader {
   int32_t metadata_version;
   int32_t message_type;
   int32_t endianness;
   int32_t features;
+  int32_t codec;

Review Comment:
   Maybe we should use the enum type here? Not that there's actually much of a difference, but it provides some documentation.



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
   EXPECT_STREQ(reader.schema.children[0]->format, "i");
 
+  EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.schema.release, nullptr);
+  EXPECT_NE(schema.release, nullptr);
+
+  schema.release(&schema);
+  ArrowIpcReaderReset(&reader);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcReader reader;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcReaderInit(&reader);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL);
+  EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch message");
+
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);
+
+  EXPECT_EQ(ArrowIpcReaderDecode(&reader, data, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  EXPECT_EQ(reader.header_size_bytes,
+            sizeof(kSimpleRecordBatch) - reader.body_size_bytes);
+  EXPECT_EQ(reader.body_size_bytes, 16);
+
+  EXPECT_EQ(reader.codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+
+  struct ArrowBufferView body;
+  body.data.as_uint8 = kSimpleRecordBatch + reader.header_size_bytes;
+  body.size_bytes = reader.body_size_bytes;
+
+  // Check full struct extract
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, body, -1, &array, nullptr), NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  ASSERT_EQ(array.n_children, 1);
+  ASSERT_EQ(array.children[0]->n_buffers, 2);
+  ASSERT_EQ(array.children[0]->length, 3);
+  EXPECT_EQ(array.children[0]->null_count, 0);
+  const int32_t* out = reinterpret_cast<const int32_t*>(array.children[0]->buffers[1]);
+  EXPECT_EQ(out[0], 1);
+  EXPECT_EQ(out[1], 2);
+  EXPECT_EQ(out[2], 3);
+
+  array.release(&array);
+
+  // Check field extract

Review Comment:
   Is the intent to map this down to a partial read later on? Otherwise, this feels like it just duplicates Nanoarrow's APIs



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -205,6 +386,74 @@ TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowTypeRoundtrip) {
   ArrowIpcReaderReset(&reader);
 }
 
+TEST_P(ArrowTypeParameterizedTestFixture, NanoarrowIpcArrowArrayRoundtrip) {
+  const std::shared_ptr<arrow::DataType>& data_type = GetParam();
+  std::shared_ptr<arrow::Schema> dummy_schema =
+      arrow::schema({arrow::field("dummy_name", data_type)});
+
+  auto maybe_empty = arrow::RecordBatch::MakeEmpty(dummy_schema);
+  ASSERT_TRUE(maybe_empty.ok());
+  auto empty = maybe_empty.ValueUnsafe();
+
+  auto maybe_nulls_array = arrow::MakeArrayOfNull(data_type, 3);
+  ASSERT_TRUE(maybe_nulls_array.ok());
+  auto nulls =
+      arrow::RecordBatch::Make(dummy_schema, 3, {maybe_nulls_array.ValueUnsafe()});
+
+  auto options = arrow::ipc::IpcWriteOptions::Defaults();
+
+  struct ArrowSchema schema;
+  struct ArrowIpcReader reader;
+  struct ArrowBufferView buffer_view;
+  struct ArrowArray array;
+
+  // Initialize the reader
+  ASSERT_TRUE(arrow::ExportSchema(*dummy_schema, &schema).ok());
+  ArrowIpcReaderInit(&reader);
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);
+
+  // Check the empty array
+  auto maybe_serialized = arrow::ipc::SerializeRecordBatch(*empty, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
+  buffer_view.data.as_uint8 += reader.header_size_bytes;
+  buffer_view.size_bytes -= reader.header_size_bytes;
+  ASSERT_EQ(ArrowIpcReaderGetArray(&reader, buffer_view, -1, &array, nullptr),
+            NANOARROW_OK);
+
+  auto maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), empty->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*empty));
+
+  // Check the array with 3 null values
+  maybe_serialized = arrow::ipc::SerializeRecordBatch(*nulls, options);
+  ASSERT_TRUE(maybe_serialized.ok());
+  buffer_view.data.data = maybe_serialized.ValueUnsafe()->data();
+  buffer_view.size_bytes = maybe_serialized.ValueOrDie()->size();
+
+  ASSERT_EQ(ArrowIpcReaderDecode(&reader, buffer_view, nullptr), NANOARROW_OK);
+  buffer_view.data.as_uint8 += reader.header_size_bytes;
+  buffer_view.size_bytes -= reader.header_size_bytes;
+  ASSERT_EQ(ArrowIpcReaderGetArray(&reader, buffer_view, -1, &array, nullptr),
+            NANOARROW_OK);
+
+  maybe_batch = arrow::ImportRecordBatch(&array, dummy_schema);
+  ASSERT_TRUE(maybe_batch.ok());
+  EXPECT_EQ(maybe_batch.ValueUnsafe()->ToString(), nulls->ToString());
+  EXPECT_TRUE(maybe_batch.ValueUnsafe()->Equals(*nulls));
+
+  if (!maybe_batch.ValueUnsafe()->Equals(*nulls)) {
+    std::cout << "something";

Review Comment:
   `FAIL() << "something descriptive goes here";`?



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");

Review Comment:
   Ideally we wouldn't have to copy data right?



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");
+    return result;
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcReaderWalkGetArray(struct ArrowIpcArraySetter* setter,
+                                      struct ArrowArray* array,
+                                      struct ArrowError* error) {
+  ns(FieldNode_struct_t) field =
+      ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+  array->length = ns(FieldNode_length(field));
+  array->null_count = ns(FieldNode_null_count(field));
+  setter->field_i += 1;
+
+  for (int64_t i = 0; i < array->n_buffers; i++) {
+    ns(Buffer_struct_t) buffer =
+        ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+    int64_t buffer_offset = ns(Buffer_offset(buffer));
+    int64_t buffer_length = ns(Buffer_length(buffer));
+    setter->buffer_i += 1;
+
+    struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+    NANOARROW_RETURN_NOT_OK(ArrowIpcReaderMakeBuffer(setter, buffer_offset, buffer_length,
+                                                     buffer_dst, error));
+  }
+
+  for (int64_t i = 0; i < array->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcReaderWalkGetArray(setter, array->children[i], error));
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+                                          struct ArrowArrayView* array_view) {
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array, array_view->storage_type));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array, array_view->n_children));
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcArrayInitFromArrayView(array->children[i], array_view->children[i]));
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetArray(struct ArrowIpcReader* reader,
+                                      struct ArrowBufferView body, int64_t field_i,
+                                      struct ArrowArray* out, struct ArrowError* error) {
+  if (reader->private_data == NULL ||
+      reader->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(error, "reader did not just decode a RecordBatch message");
+    return EINVAL;
+  }
+
+  ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))reader->private_data;
+
+  // RecordBatch messages don't count the root node but reader->fields does
+  struct ArrowIpcField* root = reader->fields + field_i + 1;
+
+  struct ArrowArray temp;
+  temp.release = NULL;
+  int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to initialize output array");
+    return result;
+  }
+
+  struct ArrowIpcArraySetter setter;
+  setter.fields = ns(RecordBatch_nodes(batch));
+  setter.field_i = field_i;
+  setter.buffers = ns(RecordBatch_buffers(batch));
+  setter.buffer_i = root->buffer_offset - 1;
+  setter.body = body;
+  setter.codec = reader->codec;
+  setter.endianness = reader->endianness;
+
+  // This should probably be done at compile time
+  uint32_t check = 1;
+  char first_byte;
+  memcpy(&first_byte, &check, sizeof(char));
+  if (first_byte) {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+  } else {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+  }
+
+  // The flatbuffers FieldNode doesn't count the root struct so we have to loop over the
+  // children ourselves
+  if (field_i == -1) {
+    temp.length = ns(RecordBatch_length(batch));
+    temp.null_count = 0;
+    setter.field_i++;
+    setter.buffer_i++;
+
+    for (int64_t i = 0; i < temp.n_children; i++) {
+      result = ArrowIpcReaderWalkGetArray(&setter, temp.children[i], error);
+      if (result != NANOARROW_OK) {
+        temp.release(&temp);
+        return result;
+      }
+    }
+  } else {
+    result = ArrowIpcReaderWalkGetArray(&setter, &temp, error);
+    if (result != NANOARROW_OK) {
+      temp.release(&temp);
+      return result;
+    }
+  }
+
+  // TODO: this performs some validation but doesn't do everything we need it to do

Review Comment:
   We might defer that to a ValidateFull-style function in core nanoarrow itself?



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
   EXPECT_STREQ(reader.schema.children[0]->format, "i");
 
+  EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.schema.release, nullptr);
+  EXPECT_NE(schema.release, nullptr);
+
+  schema.release(&schema);
+  ArrowIpcReaderReset(&reader);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcReader reader;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcReaderInit(&reader);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL);
+  EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch message");
+
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);

Review Comment:
   Is the assumption that the caller is responsible for all I/O, so it has to decode the schema and feed it back in, and we can build up a higher level reader on top of this interface?



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
   EXPECT_STREQ(reader.schema.children[0]->format, "i");
 
+  EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.schema.release, nullptr);
+  EXPECT_NE(schema.release, nullptr);
+
+  schema.release(&schema);
+  ArrowIpcReaderReset(&reader);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcReader reader;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcReaderInit(&reader);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL);
+  EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch message");
+
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);
+
+  EXPECT_EQ(ArrowIpcReaderDecode(&reader, data, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.message_type, NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH);
+  EXPECT_EQ(reader.header_size_bytes,
+            sizeof(kSimpleRecordBatch) - reader.body_size_bytes);
+  EXPECT_EQ(reader.body_size_bytes, 16);
+
+  EXPECT_EQ(reader.codec, NANOARROW_IPC_COMPRESSION_TYPE_NONE);
+
+  struct ArrowBufferView body;
+  body.data.as_uint8 = kSimpleRecordBatch + reader.header_size_bytes;
+  body.size_bytes = reader.body_size_bytes;
+
+  // Check full struct extract
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, body, -1, &array, nullptr), NANOARROW_OK);
+  EXPECT_EQ(array.length, 3);
+  EXPECT_EQ(array.null_count, 0);
+  ASSERT_EQ(array.n_children, 1);
+  ASSERT_EQ(array.children[0]->n_buffers, 2);
+  ASSERT_EQ(array.children[0]->length, 3);
+  EXPECT_EQ(array.children[0]->null_count, 0);
+  const int32_t* out = reinterpret_cast<const int32_t*>(array.children[0]->buffers[1]);
+  EXPECT_EQ(out[0], 1);
+  EXPECT_EQ(out[1], 2);
+  EXPECT_EQ(out[2], 3);
+
+  array.release(&array);
+
+  // Check field extract

Review Comment:
   Ah, I see in the actual implementation it's intended to save a bit of deserialization work



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");

Review Comment:
   (I suppose you'd want it under user control: if you only want one buffer out of many, then copying might let you save memory later on. But then the interface is a little wonky, because you have to provide the full record batch up front, implying you already did all the I/O, unless you have memory mapping available.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] paleolimbot commented on a diff in pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129429566


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");

Review Comment:
   Yeah, this should be extensible somehow...a `struct ArrowIpcBufferDecoder` or something. It also needs a way to inject compression support at runtime. I'll do both of those in a follow-up (this PR is mostly about ensuring type coverage and correctness of the flattening/unflattening).



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_test.cc:
##########
@@ -168,6 +204,151 @@ TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleSchema) {
   EXPECT_EQ(reader.schema.children[0]->flags, ARROW_FLAG_NULLABLE);
   EXPECT_STREQ(reader.schema.children[0]->format, "i");
 
+  EXPECT_EQ(ArrowIpcReaderGetSchema(&reader, &schema, &error), NANOARROW_OK);
+  EXPECT_EQ(reader.schema.release, nullptr);
+  EXPECT_NE(schema.release, nullptr);
+
+  schema.release(&schema);
+  ArrowIpcReaderReset(&reader);
+}
+
+TEST(NanoarrowIpcTest, NanoarrowIpcDecodeSimpleRecordBatch) {
+  struct ArrowIpcReader reader;
+  struct ArrowError error;
+  struct ArrowSchema schema;
+  struct ArrowArray array;
+
+  ArrowSchemaInit(&schema);
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema, 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema.children[0], NANOARROW_TYPE_INT32), NANOARROW_OK);
+
+  struct ArrowBufferView data;
+  data.data.as_uint8 = kSimpleRecordBatch;
+  data.size_bytes = sizeof(kSimpleRecordBatch);
+
+  ArrowIpcReaderInit(&reader);
+
+  // Attempt to get array should fail nicely here
+  EXPECT_EQ(ArrowIpcReaderGetArray(&reader, data, 0, nullptr, &error), EINVAL);
+  EXPECT_STREQ(error.message, "reader did not just decode a RecordBatch message");
+
+  ASSERT_EQ(ArrowIpcReaderSetSchema(&reader, &schema, nullptr), NANOARROW_OK);

Review Comment:
   I think so...what's there now might be more accurately be called an `ArrowIpcDecoder`. There are so many ways to mix, match, and parallelize the various steps here based on the tools available to the caller. This library should probably provide a reader for the simple but common "just read it all as an ArrowArrayStream" (which would at least make sure that it's possible to do).



##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +847,252 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  reader->private_data = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  if (reader->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&reader->schema, out);
+  return NANOARROW_OK;
+}
+
+static void ArrowIpcReaderCountFields(struct ArrowSchema* schema, int64_t* n_fields) {
+  *n_fields += 1;
+  for (int64_t i = 0; i < schema->n_children; i++) {
+    ArrowIpcReaderCountFields(schema->children[i], n_fields);
+  }
+}
+
+static void ArrowIpcReaderInitFields(struct ArrowIpcField* fields,
+                                     struct ArrowArrayView* view, int64_t* n_fields,
+                                     int64_t* n_buffers) {
+  struct ArrowIpcField* field = fields + (*n_fields);
+  field->array_view = view;
+  field->buffer_offset = *n_buffers;
+
+  for (int i = 0; i < 3; i++) {
+    *n_buffers += view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE;
+  }
+
+  *n_fields += 1;
+
+  for (int64_t i = 0; i < view->n_children; i++) {
+    ArrowIpcReaderInitFields(fields, view->children[i], n_fields, n_buffers);
+  }
+}
+
+ArrowErrorCode ArrowIpcReaderSetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* schema,
+                                       struct ArrowError* error) {
+  ArrowArrayViewReset(&reader->array_view);
+
+  if (reader->fields != NULL) {
+    ArrowFree(reader->fields);
+  }
+
+  // Allocate Array and ArrayView based on schema without moving the schema
+  // this will fail if the schema is not valid.
+  NANOARROW_RETURN_NOT_OK(
+      ArrowArrayViewInitFromSchema(&reader->array_view, schema, error));
+
+  // Root must be a struct
+  if (reader->array_view.storage_type != NANOARROW_TYPE_STRUCT) {
+    ArrowErrorSet(error, "schema must be a struct type");
+    return EINVAL;
+  }
+
+  // Walk tree and calculate how many fields we need to allocate
+  reader->n_fields = 0;
+  ArrowIpcReaderCountFields(schema, &reader->n_fields);
+  reader->fields =
+      (struct ArrowIpcField*)ArrowMalloc(reader->n_fields * sizeof(struct ArrowIpcField));
+  if (reader->fields == NULL) {
+    ArrowErrorSet(error, "Failed to allocate reader->fields");
+    return ENOMEM;
+  }
+  memset(reader->fields, 0, reader->n_fields * sizeof(struct ArrowIpcField));
+
+  // Init field information and calculate starting buffer offset for each
+  int64_t field_i = 0;
+  ArrowIpcReaderInitFields(reader->fields, &reader->array_view, &field_i,
+                           &reader->n_buffers);
+
+  return NANOARROW_OK;
+}
+
+struct ArrowIpcArraySetter {
+  ns(FieldNode_vec_t) fields;
+  int64_t field_i;
+  ns(Buffer_vec_t) buffers;
+  int64_t buffer_i;
+  struct ArrowBufferView body;
+  enum ArrowIpcCompressionType codec;
+  enum ArrowIpcEndianness endianness;
+  enum ArrowIpcEndianness system_endianness;
+};
+
+static int ArrowIpcReaderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset,
+                                    int64_t length, struct ArrowBuffer* out,
+                                    struct ArrowError* error) {
+  if (length == 0) {
+    return NANOARROW_OK;
+  }
+
+  // Check that this buffer fits within the body
+  if (offset < 0 || (offset + length) > setter->body.size_bytes) {
+    ArrowErrorSet(error,
+                  "Buffer %ld requires body offsets [%ld..%ld) but body has size %ld",
+                  (long)setter->buffer_i - 1, (long)offset, (long)offset + (long)length,
+                  setter->body.size_bytes);
+    return EINVAL;
+  }
+
+  struct ArrowBufferView view;
+  view.data.as_uint8 = setter->body.data.as_uint8 + offset;
+  view.size_bytes = length;
+
+  if (setter->codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) {
+    ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression");
+    return ENOTSUP;
+  }
+
+  if (setter->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED &&
+      setter->endianness != setter->system_endianness) {
+    ArrowErrorSet(error,
+                  "The nanoarrow_ipc extension does not support non-system endianness");
+    return ENOTSUP;
+  }
+
+  int result = ArrowBufferAppendBufferView(out, view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to copy buffer");
+    return result;
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcReaderWalkGetArray(struct ArrowIpcArraySetter* setter,
+                                      struct ArrowArray* array,
+                                      struct ArrowError* error) {
+  ns(FieldNode_struct_t) field =
+      ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i));
+  array->length = ns(FieldNode_length(field));
+  array->null_count = ns(FieldNode_null_count(field));
+  setter->field_i += 1;
+
+  for (int64_t i = 0; i < array->n_buffers; i++) {
+    ns(Buffer_struct_t) buffer =
+        ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i));
+    int64_t buffer_offset = ns(Buffer_offset(buffer));
+    int64_t buffer_length = ns(Buffer_length(buffer));
+    setter->buffer_i += 1;
+
+    struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i);
+    NANOARROW_RETURN_NOT_OK(ArrowIpcReaderMakeBuffer(setter, buffer_offset, buffer_length,
+                                                     buffer_dst, error));
+  }
+
+  for (int64_t i = 0; i < array->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcReaderWalkGetArray(setter, array->children[i], error));
+  }
+
+  return NANOARROW_OK;
+}
+
+static int ArrowIpcArrayInitFromArrayView(struct ArrowArray* array,
+                                          struct ArrowArrayView* array_view) {
+  NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromType(array, array_view->storage_type));
+  NANOARROW_RETURN_NOT_OK(ArrowArrayAllocateChildren(array, array_view->n_children));
+  for (int64_t i = 0; i < array_view->n_children; i++) {
+    NANOARROW_RETURN_NOT_OK(
+        ArrowIpcArrayInitFromArrayView(array->children[i], array_view->children[i]));
+  }
+
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetArray(struct ArrowIpcReader* reader,
+                                      struct ArrowBufferView body, int64_t field_i,
+                                      struct ArrowArray* out, struct ArrowError* error) {
+  if (reader->private_data == NULL ||
+      reader->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) {
+    ArrowErrorSet(error, "reader did not just decode a RecordBatch message");
+    return EINVAL;
+  }
+
+  ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))reader->private_data;
+
+  // RecordBatch messages don't count the root node but reader->fields does
+  struct ArrowIpcField* root = reader->fields + field_i + 1;
+
+  struct ArrowArray temp;
+  temp.release = NULL;
+  int result = ArrowIpcArrayInitFromArrayView(&temp, root->array_view);
+  if (result != NANOARROW_OK) {
+    ArrowErrorSet(error, "Failed to initialize output array");
+    return result;
+  }
+
+  struct ArrowIpcArraySetter setter;
+  setter.fields = ns(RecordBatch_nodes(batch));
+  setter.field_i = field_i;
+  setter.buffers = ns(RecordBatch_buffers(batch));
+  setter.buffer_i = root->buffer_offset - 1;
+  setter.body = body;
+  setter.codec = reader->codec;
+  setter.endianness = reader->endianness;
+
+  // This should probably be done at compile time
+  uint32_t check = 1;
+  char first_byte;
+  memcpy(&first_byte, &check, sizeof(char));
+  if (first_byte) {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_LITTLE;
+  } else {
+    setter.system_endianness = NANOARROW_IPC_ENDIANNESS_BIG;
+  }
+
+  // The flatbuffers FieldNode doesn't count the root struct so we have to loop over the
+  // children ourselves
+  if (field_i == -1) {
+    temp.length = ns(RecordBatch_length(batch));
+    temp.null_count = 0;
+    setter.field_i++;
+    setter.buffer_i++;
+
+    for (int64_t i = 0; i < temp.n_children; i++) {
+      result = ArrowIpcReaderWalkGetArray(&setter, temp.children[i], error);
+      if (result != NANOARROW_OK) {
+        temp.release(&temp);
+        return result;
+      }
+    }
+  } else {
+    result = ArrowIpcReaderWalkGetArray(&setter, &temp, error);
+    if (result != NANOARROW_OK) {
+      temp.release(&temp);
+      return result;
+    }
+  }
+
+  // TODO: this performs some validation but doesn't do everything we need it to do

Review Comment:
   Definitely!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] paleolimbot commented on pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#issuecomment-1460467246

   Ok...I renamed a bunch of things because what I wrote was more accurately called a Decoder. The interface isn't perfect yet but before it was confusing as "read" and "get" implies behaviour that didn't exist. The idea is basically that the decoder works on a one-message-at-a-time basis with minimal internal state...various "readers" could be implemented on top of it based on the features they have available.
   
   Three obvious follow-ups to this are:
   
   - The ability to inject custom buffer reading and/or zero copy buffer references.
   - The ability to inject compression support
   - A simple single-threaded "reader" that manages IO and can read a stream of messages as an ArrowArrayStream


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-nanoarrow] paleolimbot commented on a diff in pull request #143: feat(extensions/nanoarrow_ipc): Decode RecordBatch message to ArrowArray

Posted by "paleolimbot (via GitHub)" <gi...@apache.org>.
paleolimbot commented on code in PR #143:
URL: https://github.com/apache/arrow-nanoarrow/pull/143#discussion_r1129451619


##########
extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc.c:
##########
@@ -785,5 +876,261 @@ ArrowErrorCode ArrowIpcReaderDecode(struct ArrowIpcReader* reader,
       return EINVAL;
   }
 
+  private_data->last_message = message_header;
+  return NANOARROW_OK;
+}
+
+ArrowErrorCode ArrowIpcReaderGetSchema(struct ArrowIpcReader* reader,
+                                       struct ArrowSchema* out,
+                                       struct ArrowError* error) {
+  struct ArrowIpcReaderPrivate* private_data =
+      (struct ArrowIpcReaderPrivate*)reader->private_data;
+
+  if (private_data->schema.release == NULL) {
+    ArrowErrorSet(error, "reader does not contain a valid schema");
+    return EINVAL;
+  }
+
+  ArrowSchemaMove(&private_data->schema, out);

Review Comment:
   Until there's a code path where the decoder needs a copy of the schema for something, probably a move? Strictly speaking, the decoder just needs to "see" the schema once and doesn't need to refer to it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org