You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/12/01 13:01:35 UTC

(arrow-nanoarrow) branch main updated: feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)

This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c3f5d5  feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)
5c3f5d5 is described below

commit 5c3f5d546bdb25625350926bf16f736837733901
Author: Dewey Dunnington <de...@dunnington.ca>
AuthorDate: Fri Dec 1 09:01:30 2023 -0400

    feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)
    
    This PR adds a few higher level functions that were missing:
    `WriteDataFile()`, `ReadDataFile()`, and `ReadBatch()`. I think these
    are the last pieces of JSON that have to be read in order to implement
    golden file tests with at least a few arrow-testing files!
    
    ---------
    
    Co-authored-by: Benjamin Kietzman <be...@gmail.com>
---
 src/nanoarrow/nanoarrow_testing.hpp     | 161 ++++++++++++++++++++++++++++++++
 src/nanoarrow/nanoarrow_testing_test.cc |  73 +++++++++++++++
 2 files changed, 234 insertions(+)

diff --git a/src/nanoarrow/nanoarrow_testing.hpp b/src/nanoarrow/nanoarrow_testing.hpp
index 103f22e..15580b6 100644
--- a/src/nanoarrow/nanoarrow_testing.hpp
+++ b/src/nanoarrow/nanoarrow_testing.hpp
@@ -46,6 +46,48 @@ namespace testing {
 /// \brief Writer for the Arrow integration testing JSON format
 class TestingJSONWriter {
  public:
+  /// \brief Write an ArrowArrayStream as a data file JSON object to out
+  ///
+  /// Creates output like `{"schema": {...}, "batches": [...], ...}`.
+  ArrowErrorCode WriteDataFile(std::ostream& out, ArrowArrayStream* stream) {
+    if (stream == nullptr || stream->release == nullptr) {
+      return EINVAL;
+    }
+
+    out << R"({"schema": )";
+
+    nanoarrow::UniqueSchema schema;
+    NANOARROW_RETURN_NOT_OK(stream->get_schema(stream, schema.get()));
+    NANOARROW_RETURN_NOT_OK(WriteSchema(out, schema.get()));
+
+    nanoarrow::UniqueArrayView array_view;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), nullptr));
+
+    out << R"(, "batches": [)";
+
+    nanoarrow::UniqueArray array;
+    std::string sep;
+    do {
+      NANOARROW_RETURN_NOT_OK(stream->get_next(stream, array.get()));
+      if (array->release == nullptr) {
+        break;
+      }
+
+      NANOARROW_RETURN_NOT_OK(
+          ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr));
+
+      out << sep;
+      sep = ", ";
+      NANOARROW_RETURN_NOT_OK(WriteBatch(out, schema.get(), array_view.get()));
+      array.reset();
+    } while (true);
+
+    out << "]}";
+
+    return NANOARROW_OK;
+  }
+
   /// \brief Write a schema to out
   ///
   /// Creates output like `{"fields": [...], "metadata": [...]}`.
@@ -622,6 +664,57 @@ class TestingJSONReader {
   using json = nlohmann::json;
 
  public:
+  /// \brief Read JSON representing a data file object
+  ///
+  /// Read a JSON object in the form `{"schema": {...}, "batches": [...], ...}`,
+  /// propagating `out` on success.
+  ArrowErrorCode ReadDataFile(const std::string& data_file_json, ArrowArrayStream* out,
+                              ArrowError* error = nullptr) {
+    try {
+      auto obj = json::parse(data_file_json);
+      NANOARROW_RETURN_NOT_OK(Check(obj.is_object(), error, "data file must be object"));
+      NANOARROW_RETURN_NOT_OK(
+          Check(obj.contains("schema"), error, "data file missing key 'schema'"));
+
+      // Read Schema
+      nanoarrow::UniqueSchema schema;
+      NANOARROW_RETURN_NOT_OK(SetSchema(schema.get(), obj["schema"], error));
+
+      NANOARROW_RETURN_NOT_OK(
+          Check(obj.contains("batches"), error, "data file missing key 'batches'"));
+      const auto& batches = obj["batches"];
+      NANOARROW_RETURN_NOT_OK(
+          Check(batches.is_array(), error, "data file batches must be array"));
+
+      // Populate ArrayView
+      nanoarrow::UniqueArrayView array_view;
+      NANOARROW_RETURN_NOT_OK(
+          ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), error));
+
+      // Initialize ArrayStream with required capacity
+      nanoarrow::UniqueArrayStream stream;
+      NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+          ArrowBasicArrayStreamInit(stream.get(), schema.get(), batches.size()), error);
+
+      // Populate ArrayStream batches
+      for (size_t i = 0; i < batches.size(); i++) {
+        nanoarrow::UniqueArray array;
+        NANOARROW_RETURN_NOT_OK(
+            ArrowArrayInitFromArrayView(array.get(), array_view.get(), error));
+        NANOARROW_RETURN_NOT_OK(
+            SetArrayBatch(batches[i], array_view.get(), array.get(), error));
+        ArrowBasicArrayStreamSetArray(stream.get(), i, array.get());
+      }
+
+      ArrowArrayStreamMove(stream.get(), out);
+      return NANOARROW_OK;
+    } catch (json::exception& e) {
+      ArrowErrorSet(error, "Exception in TestingJSONReader::ReadDataFile(): %s",
+                    e.what());
+      return EINVAL;
+    }
+  }
+
   /// \brief Read JSON representing a Schema
   ///
   /// Reads a JSON object in the form `{"fields": [...], "metadata": [...]}`,
@@ -660,6 +753,34 @@ class TestingJSONReader {
     }
   }
 
+  /// \brief Read JSON representing a RecordBatch
+  ///
+  /// Read a JSON object in the form `{"count": 123, "columns": [...]}`, propagating `out`
+  /// on success.
+  ArrowErrorCode ReadBatch(const std::string& batch_json, const ArrowSchema* schema,
+                           ArrowArray* out, ArrowError* error = nullptr) {
+    try {
+      auto obj = json::parse(batch_json);
+
+      // ArrowArrayView to enable validation
+      nanoarrow::UniqueArrayView array_view;
+      NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromSchema(
+          array_view.get(), const_cast<ArrowSchema*>(schema), error));
+
+      // ArrowArray to hold memory
+      nanoarrow::UniqueArray array;
+      NANOARROW_RETURN_NOT_OK(
+          ArrowArrayInitFromSchema(array.get(), const_cast<ArrowSchema*>(schema), error));
+
+      NANOARROW_RETURN_NOT_OK(SetArrayBatch(obj, array_view.get(), array.get(), error));
+      ArrowArrayMove(array.get(), out);
+      return NANOARROW_OK;
+    } catch (json::exception& e) {
+      ArrowErrorSet(error, "Exception in TestingJSONReader::ReadBatch(): %s", e.what());
+      return EINVAL;
+    }
+  }
+
   /// \brief Read JSON representing a Column
   ///
   /// Read a JSON object in the form
@@ -1094,6 +1215,46 @@ class TestingJSONReader {
     return NANOARROW_OK;
   }
 
+  ArrowErrorCode SetArrayBatch(const json& value, ArrowArrayView* array_view,
+                               ArrowArray* array, ArrowError* error) {
+    NANOARROW_RETURN_NOT_OK(
+        Check(value.is_object(), error, "Expected RecordBatch to be a JSON object"));
+
+    NANOARROW_RETURN_NOT_OK(
+        Check(value.contains("count"), error, "RecordBatch missing key 'count'"));
+
+    const auto& count = value["count"];
+    NANOARROW_RETURN_NOT_OK(
+        Check(count.is_number_integer(), error, "RecordBatch count must be integer"));
+    array_view->length = count.get<int64_t>();
+
+    NANOARROW_RETURN_NOT_OK(
+        Check(value.contains("columns"), error, "RecordBatch missing key 'columns'"));
+
+    const auto& columns = value["columns"];
+    NANOARROW_RETURN_NOT_OK(
+        Check(columns.is_array(), error, "RecordBatch columns must be array"));
+    NANOARROW_RETURN_NOT_OK(Check(columns.size() == array_view->n_children, error,
+                                  "RecordBatch children has incorrect size"));
+
+    for (int64_t i = 0; i < array_view->n_children; i++) {
+      NANOARROW_RETURN_NOT_OK(
+          SetArrayColumn(columns[i], array_view->children[i], array->children[i], error));
+    }
+
+    // Validate the array view
+    NANOARROW_RETURN_NOT_OK(PrefixError(
+        ArrowArrayViewValidate(array_view, NANOARROW_VALIDATION_LEVEL_FULL, error), error,
+        "RecordBatch failed to validate: "));
+
+    // Flush length and buffer pointers to the Array
+    array->length = array_view->length;
+    NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+        ArrowArrayFinishBuilding(array, NANOARROW_VALIDATION_LEVEL_NONE, nullptr), error);
+
+    return NANOARROW_OK;
+  }
+
   ArrowErrorCode SetArrayColumn(const json& value, ArrowArrayView* array_view,
                                 ArrowArray* array, ArrowError* error,
                                 const std::string& parent_error_prefix = "") {
diff --git a/src/nanoarrow/nanoarrow_testing_test.cc b/src/nanoarrow/nanoarrow_testing_test.cc
index 0b8f733..2fcc62a 100644
--- a/src/nanoarrow/nanoarrow_testing_test.cc
+++ b/src/nanoarrow/nanoarrow_testing_test.cc
@@ -747,6 +747,79 @@ TEST(NanoarrowTestingTest, NanoarrowTestingTestReadFieldNested) {
   EXPECT_STREQ(schema->children[0]->format, "n");
 }
 
+TEST(NanoarrowTestingTest, NanoarrowTestingTestRoundtripDataFile) {
+  nanoarrow::UniqueArrayStream stream;
+  ArrowError error;
+  error.message[0] = '\0';
+
+  std::string data_file_json =
+      R"({"schema": {"fields": [)"
+      R"({"name": "col1", "nullable": true, "type": {"name": "null"}, "children": [], "metadata": null}, )"
+      R"({"name": "col2", "nullable": true, "type": {"name": "utf8"}, "children": [], "metadata": null}], )"
+      R"("metadata": null})"
+      R"(, "batches": [)"
+      R"({"count": 1, "columns": [)"
+      R"({"name": "col1", "count": 1}, )"
+      R"({"name": "col2", "count": 1, "VALIDITY": [1], "OFFSET": [0, 3], "DATA": ["abc"]}]}, )"
+      R"({"count": 2, "columns": [)"
+      R"({"name": "col1", "count": 2}, )"
+      R"({"name": "col2", "count": 2, "VALIDITY": [1, 1], "OFFSET": [0, 3, 5], "DATA": ["abc", "de"]}]})"
+      R"(]})";
+
+  TestingJSONReader reader;
+  ASSERT_EQ(reader.ReadDataFile(data_file_json, stream.get(), &error), NANOARROW_OK)
+      << error.message;
+
+  TestingJSONWriter writer;
+  std::stringstream data_file_json_roundtrip;
+  ASSERT_EQ(writer.WriteDataFile(data_file_json_roundtrip, stream.get()), NANOARROW_OK);
+  EXPECT_EQ(data_file_json_roundtrip.str(), data_file_json);
+
+  stream.reset();
+  data_file_json_roundtrip.str("");
+
+  // Check with zero batches
+  std::string data_file_json_empty =
+      R"({"schema": {"fields": [], "metadata": null}, "batches": []})";
+  ASSERT_EQ(reader.ReadDataFile(data_file_json_empty, stream.get(), &error), NANOARROW_OK)
+      << error.message;
+  ASSERT_EQ(writer.WriteDataFile(data_file_json_roundtrip, stream.get()), NANOARROW_OK);
+  EXPECT_EQ(data_file_json_roundtrip.str(), data_file_json_empty);
+
+  // Also test error for invalid JSON
+  ASSERT_EQ(reader.ReadDataFile("{", stream.get()), EINVAL);
+}
+
+TEST(NanoarrowTestingTest, NanoarrowTestingTestReadBatch) {
+  nanoarrow::UniqueSchema schema;
+  nanoarrow::UniqueArray array;
+  ArrowError error;
+  error.message[0] = '\0';
+
+  TestingJSONReader reader;
+
+  ArrowSchemaInit(schema.get());
+  ASSERT_EQ(ArrowSchemaSetTypeStruct(schema.get(), 1), NANOARROW_OK);
+  ASSERT_EQ(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_NA), NANOARROW_OK);
+
+  ASSERT_EQ(reader.ReadBatch(R"({"count": 1, "columns": [{"name": null, "count": 1}]})",
+                             schema.get(), array.get(), &error),
+            NANOARROW_OK)
+      << error.message;
+  ASSERT_NE(array->release, nullptr);
+  EXPECT_EQ(array->length, 1);
+  ASSERT_EQ(array->n_children, 1);
+  EXPECT_EQ(array->children[0]->length, 1);
+
+  // Check invalid JSON
+  EXPECT_EQ(reader.ReadBatch(R"({)", schema.get(), array.get()), EINVAL);
+
+  // Check that field is validated
+  EXPECT_EQ(reader.ReadBatch(R"({"count": 1, "columns": [{"name": null, "count": -1}]})",
+                             schema.get(), array.get()),
+            EINVAL);
+}
+
 TEST(NanoarrowTestingTest, NanoarrowTestingTestReadColumnBasic) {
   nanoarrow::UniqueSchema schema;
   nanoarrow::UniqueArray array;