You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pa...@apache.org on 2023/12/01 13:01:35 UTC
(arrow-nanoarrow) branch main updated: feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)
This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c3f5d5 feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)
5c3f5d5 is described below
commit 5c3f5d546bdb25625350926bf16f736837733901
Author: Dewey Dunnington <de...@dunnington.ca>
AuthorDate: Fri Dec 1 09:01:30 2023 -0400
feat: Add batch reader and data file read/write to/from ArrowArrayStream (#328)
This PR adds a few higher level functions that were missing:
`WriteDataFile()`, `ReadDataFile()`, and `ReadBatch()`. I think these
are the last pieces of JSON that have to be read in order to implement
golden file tests with at least a few arrow-testing files!
---------
Co-authored-by: Benjamin Kietzman <be...@gmail.com>
---
src/nanoarrow/nanoarrow_testing.hpp | 161 ++++++++++++++++++++++++++++++++
src/nanoarrow/nanoarrow_testing_test.cc | 73 +++++++++++++++
2 files changed, 234 insertions(+)
diff --git a/src/nanoarrow/nanoarrow_testing.hpp b/src/nanoarrow/nanoarrow_testing.hpp
index 103f22e..15580b6 100644
--- a/src/nanoarrow/nanoarrow_testing.hpp
+++ b/src/nanoarrow/nanoarrow_testing.hpp
@@ -46,6 +46,48 @@ namespace testing {
/// \brief Writer for the Arrow integration testing JSON format
class TestingJSONWriter {
public:
+ /// \brief Write an ArrowArrayStream as a data file JSON object to out
+ ///
+ /// Creates output like `{"schema": {...}, "batches": [...], ...}`.
+ ArrowErrorCode WriteDataFile(std::ostream& out, ArrowArrayStream* stream) {
+ if (stream == nullptr || stream->release == nullptr) {
+ return EINVAL;
+ }
+
+ out << R"({"schema": )";
+
+ nanoarrow::UniqueSchema schema;
+ NANOARROW_RETURN_NOT_OK(stream->get_schema(stream, schema.get()));
+ NANOARROW_RETURN_NOT_OK(WriteSchema(out, schema.get()));
+
+ nanoarrow::UniqueArrayView array_view;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), nullptr));
+
+ out << R"(, "batches": [)";
+
+ nanoarrow::UniqueArray array;
+ std::string sep;
+ do {
+ NANOARROW_RETURN_NOT_OK(stream->get_next(stream, array.get()));
+ if (array->release == nullptr) {
+ break;
+ }
+
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewSetArray(array_view.get(), array.get(), nullptr));
+
+ out << sep;
+ sep = ", ";
+ NANOARROW_RETURN_NOT_OK(WriteBatch(out, schema.get(), array_view.get()));
+ array.reset();
+ } while (true);
+
+ out << "]}";
+
+ return NANOARROW_OK;
+ }
+
/// \brief Write a schema to out
///
/// Creates output like `{"fields": [...], "metadata": [...]}`.
@@ -622,6 +664,57 @@ class TestingJSONReader {
using json = nlohmann::json;
public:
+ /// \brief Read JSON representing a data file object
+ ///
+ /// Read a JSON object in the form `{"schema": {...}, "batches": [...], ...}`,
+ /// propagating `out` on success.
+ ArrowErrorCode ReadDataFile(const std::string& data_file_json, ArrowArrayStream* out,
+ ArrowError* error = nullptr) {
+ try {
+ auto obj = json::parse(data_file_json);
+ NANOARROW_RETURN_NOT_OK(Check(obj.is_object(), error, "data file must be object"));
+ NANOARROW_RETURN_NOT_OK(
+ Check(obj.contains("schema"), error, "data file missing key 'schema'"));
+
+ // Read Schema
+ nanoarrow::UniqueSchema schema;
+ NANOARROW_RETURN_NOT_OK(SetSchema(schema.get(), obj["schema"], error));
+
+ NANOARROW_RETURN_NOT_OK(
+ Check(obj.contains("batches"), error, "data file missing key 'batches'"));
+ const auto& batches = obj["batches"];
+ NANOARROW_RETURN_NOT_OK(
+ Check(batches.is_array(), error, "data file batches must be array"));
+
+ // Populate ArrayView
+ nanoarrow::UniqueArrayView array_view;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayViewInitFromSchema(array_view.get(), schema.get(), error));
+
+ // Initialize ArrayStream with required capacity
+ nanoarrow::UniqueArrayStream stream;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowBasicArrayStreamInit(stream.get(), schema.get(), batches.size()), error);
+
+ // Populate ArrayStream batches
+ for (size_t i = 0; i < batches.size(); i++) {
+ nanoarrow::UniqueArray array;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayInitFromArrayView(array.get(), array_view.get(), error));
+ NANOARROW_RETURN_NOT_OK(
+ SetArrayBatch(batches[i], array_view.get(), array.get(), error));
+ ArrowBasicArrayStreamSetArray(stream.get(), i, array.get());
+ }
+
+ ArrowArrayStreamMove(stream.get(), out);
+ return NANOARROW_OK;
+ } catch (json::exception& e) {
+ ArrowErrorSet(error, "Exception in TestingJSONReader::ReadDataFile(): %s",
+ e.what());
+ return EINVAL;
+ }
+ }
+
/// \brief Read JSON representing a Schema
///
/// Reads a JSON object in the form `{"fields": [...], "metadata": [...]}`,
@@ -660,6 +753,34 @@ class TestingJSONReader {
}
}
+ /// \brief Read JSON representing a RecordBatch
+ ///
+ /// Read a JSON object in the form `{"count": 123, "columns": [...]}`, propagating `out`
+ /// on success.
+ ArrowErrorCode ReadBatch(const std::string& batch_json, const ArrowSchema* schema,
+ ArrowArray* out, ArrowError* error = nullptr) {
+ try {
+ auto obj = json::parse(batch_json);
+
+ // ArrowArrayView to enable validation
+ nanoarrow::UniqueArrayView array_view;
+ NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromSchema(
+ array_view.get(), const_cast<ArrowSchema*>(schema), error));
+
+ // ArrowArray to hold memory
+ nanoarrow::UniqueArray array;
+ NANOARROW_RETURN_NOT_OK(
+ ArrowArrayInitFromSchema(array.get(), const_cast<ArrowSchema*>(schema), error));
+
+ NANOARROW_RETURN_NOT_OK(SetArrayBatch(obj, array_view.get(), array.get(), error));
+ ArrowArrayMove(array.get(), out);
+ return NANOARROW_OK;
+ } catch (json::exception& e) {
+ ArrowErrorSet(error, "Exception in TestingJSONReader::ReadBatch(): %s", e.what());
+ return EINVAL;
+ }
+ }
+
/// \brief Read JSON representing a Column
///
/// Read a JSON object in the form
@@ -1094,6 +1215,46 @@ class TestingJSONReader {
return NANOARROW_OK;
}
+ ArrowErrorCode SetArrayBatch(const json& value, ArrowArrayView* array_view,
+ ArrowArray* array, ArrowError* error) {
+ NANOARROW_RETURN_NOT_OK(
+ Check(value.is_object(), error, "Expected RecordBatch to be a JSON object"));
+
+ NANOARROW_RETURN_NOT_OK(
+ Check(value.contains("count"), error, "RecordBatch missing key 'count'"));
+
+ const auto& count = value["count"];
+ NANOARROW_RETURN_NOT_OK(
+ Check(count.is_number_integer(), error, "RecordBatch count must be integer"));
+ array_view->length = count.get<int64_t>();
+
+ NANOARROW_RETURN_NOT_OK(
+ Check(value.contains("columns"), error, "RecordBatch missing key 'columns'"));
+
+ const auto& columns = value["columns"];
+ NANOARROW_RETURN_NOT_OK(
+ Check(columns.is_array(), error, "RecordBatch columns must be array"));
+ NANOARROW_RETURN_NOT_OK(Check(columns.size() == array_view->n_children, error,
+ "RecordBatch children has incorrect size"));
+
+ for (int64_t i = 0; i < array_view->n_children; i++) {
+ NANOARROW_RETURN_NOT_OK(
+ SetArrayColumn(columns[i], array_view->children[i], array->children[i], error));
+ }
+
+ // Validate the array view
+ NANOARROW_RETURN_NOT_OK(PrefixError(
+ ArrowArrayViewValidate(array_view, NANOARROW_VALIDATION_LEVEL_FULL, error), error,
+ "RecordBatch failed to validate: "));
+
+ // Flush length and buffer pointers to the Array
+ array->length = array_view->length;
+ NANOARROW_RETURN_NOT_OK_WITH_ERROR(
+ ArrowArrayFinishBuilding(array, NANOARROW_VALIDATION_LEVEL_NONE, nullptr), error);
+
+ return NANOARROW_OK;
+ }
+
ArrowErrorCode SetArrayColumn(const json& value, ArrowArrayView* array_view,
ArrowArray* array, ArrowError* error,
const std::string& parent_error_prefix = "") {
diff --git a/src/nanoarrow/nanoarrow_testing_test.cc b/src/nanoarrow/nanoarrow_testing_test.cc
index 0b8f733..2fcc62a 100644
--- a/src/nanoarrow/nanoarrow_testing_test.cc
+++ b/src/nanoarrow/nanoarrow_testing_test.cc
@@ -747,6 +747,79 @@ TEST(NanoarrowTestingTest, NanoarrowTestingTestReadFieldNested) {
EXPECT_STREQ(schema->children[0]->format, "n");
}
+TEST(NanoarrowTestingTest, NanoarrowTestingTestRoundtripDataFile) {
+ nanoarrow::UniqueArrayStream stream;
+ ArrowError error;
+ error.message[0] = '\0';
+
+ std::string data_file_json =
+ R"({"schema": {"fields": [)"
+ R"({"name": "col1", "nullable": true, "type": {"name": "null"}, "children": [], "metadata": null}, )"
+ R"({"name": "col2", "nullable": true, "type": {"name": "utf8"}, "children": [], "metadata": null}], )"
+ R"("metadata": null})"
+ R"(, "batches": [)"
+ R"({"count": 1, "columns": [)"
+ R"({"name": "col1", "count": 1}, )"
+ R"({"name": "col2", "count": 1, "VALIDITY": [1], "OFFSET": [0, 3], "DATA": ["abc"]}]}, )"
+ R"({"count": 2, "columns": [)"
+ R"({"name": "col1", "count": 2}, )"
+ R"({"name": "col2", "count": 2, "VALIDITY": [1, 1], "OFFSET": [0, 3, 5], "DATA": ["abc", "de"]}]})"
+ R"(]})";
+
+ TestingJSONReader reader;
+ ASSERT_EQ(reader.ReadDataFile(data_file_json, stream.get(), &error), NANOARROW_OK)
+ << error.message;
+
+ TestingJSONWriter writer;
+ std::stringstream data_file_json_roundtrip;
+ ASSERT_EQ(writer.WriteDataFile(data_file_json_roundtrip, stream.get()), NANOARROW_OK);
+ EXPECT_EQ(data_file_json_roundtrip.str(), data_file_json);
+
+ stream.reset();
+ data_file_json_roundtrip.str("");
+
+ // Check with zero batches
+ std::string data_file_json_empty =
+ R"({"schema": {"fields": [], "metadata": null}, "batches": []})";
+ ASSERT_EQ(reader.ReadDataFile(data_file_json_empty, stream.get(), &error), NANOARROW_OK)
+ << error.message;
+ ASSERT_EQ(writer.WriteDataFile(data_file_json_roundtrip, stream.get()), NANOARROW_OK);
+ EXPECT_EQ(data_file_json_roundtrip.str(), data_file_json_empty);
+
+ // Also test error for invalid JSON
+ ASSERT_EQ(reader.ReadDataFile("{", stream.get()), EINVAL);
+}
+
+TEST(NanoarrowTestingTest, NanoarrowTestingTestReadBatch) {
+ nanoarrow::UniqueSchema schema;
+ nanoarrow::UniqueArray array;
+ ArrowError error;
+ error.message[0] = '\0';
+
+ TestingJSONReader reader;
+
+ ArrowSchemaInit(schema.get());
+ ASSERT_EQ(ArrowSchemaSetTypeStruct(schema.get(), 1), NANOARROW_OK);
+ ASSERT_EQ(ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_NA), NANOARROW_OK);
+
+ ASSERT_EQ(reader.ReadBatch(R"({"count": 1, "columns": [{"name": null, "count": 1}]})",
+ schema.get(), array.get(), &error),
+ NANOARROW_OK)
+ << error.message;
+ ASSERT_NE(array->release, nullptr);
+ EXPECT_EQ(array->length, 1);
+ ASSERT_EQ(array->n_children, 1);
+ EXPECT_EQ(array->children[0]->length, 1);
+
+ // Check invalid JSON
+ EXPECT_EQ(reader.ReadBatch(R"({)", schema.get(), array.get()), EINVAL);
+
+ // Check that field is validated
+ EXPECT_EQ(reader.ReadBatch(R"({"count": 1, "columns": [{"name": null, "count": -1}]})",
+ schema.get(), array.get()),
+ EINVAL);
+}
+
TEST(NanoarrowTestingTest, NanoarrowTestingTestReadColumnBasic) {
nanoarrow::UniqueSchema schema;
nanoarrow::UniqueArray array;