You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by sh...@apache.org on 2020/05/03 14:02:39 UTC

[arrow] branch master updated: ARROW-8509: [GLib] Add low level record batch read/write functions

This is an automated email from the ASF dual-hosted git repository.

shiro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new efc7b81  ARROW-8509: [GLib] Add low level record batch read/write functions
efc7b81 is described below

commit efc7b816177ba0bc3b8fed86115cae2e538c5cf3
Author: Sutou Kouhei <ko...@clear-code.com>
AuthorDate: Sun May 3 23:02:13 2020 +0900

    ARROW-8509: [GLib] Add low level record batch read/write functions
    
    Closes #7086 from kou/glib-record-batch-serialize
    
    Authored-by: Sutou Kouhei <ko...@clear-code.com>
    Signed-off-by: Yosuke Shiro <yo...@gmail.com>
---
 c_glib/arrow-glib/Makefile.am            |  1 +
 c_glib/arrow-glib/arrow-glib.hpp         |  1 +
 c_glib/arrow-glib/input-stream.cpp       | 57 ++++++++++++++++++++++++++++++
 c_glib/arrow-glib/input-stream.h         |  8 +++++
 c_glib/arrow-glib/ipc-options.cpp        | 28 +++++++++++++--
 c_glib/arrow-glib/ipc-options.hpp        | 32 +++++++++++++++++
 c_glib/arrow-glib/meson.build            |  1 +
 c_glib/arrow-glib/output-stream.cpp      | 59 +++++++++++++++++++++++++++++---
 c_glib/arrow-glib/output-stream.h        |  8 +++++
 c_glib/arrow-glib/record-batch.cpp       | 59 ++++++++++++++++++++++++++++----
 c_glib/arrow-glib/record-batch.h         |  8 ++++-
 c_glib/test/test-buffer-input-stream.rb  | 25 ++++++++++++++
 c_glib/test/test-buffer-output-stream.rb | 25 ++++++++++++++
 c_glib/test/test-record-batch.rb         |  7 ++++
 14 files changed, 304 insertions(+), 15 deletions(-)

diff --git a/c_glib/arrow-glib/Makefile.am b/c_glib/arrow-glib/Makefile.am
index 2c07501..8e02ce3 100644
--- a/c_glib/arrow-glib/Makefile.am
+++ b/c_glib/arrow-glib/Makefile.am
@@ -182,6 +182,7 @@ libarrow_glib_la_cpp_headers +=			\
 	writable-file.hpp
 
 libarrow_glib_la_cpp_headers +=			\
+	ipc-options.hpp				\
 	metadata-version.hpp			\
 	reader.hpp				\
 	writer.hpp
diff --git a/c_glib/arrow-glib/arrow-glib.hpp b/c_glib/arrow-glib/arrow-glib.hpp
index d755b2b..1d58b48 100644
--- a/c_glib/arrow-glib/arrow-glib.hpp
+++ b/c_glib/arrow-glib/arrow-glib.hpp
@@ -44,6 +44,7 @@
 #include <arrow-glib/writable.hpp>
 #include <arrow-glib/writable-file.hpp>
 
+#include <arrow-glib/ipc-options.hpp>
 #include <arrow-glib/metadata-version.hpp>
 #include <arrow-glib/reader.hpp>
 #include <arrow-glib/writer.hpp>
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index c98442e..7b8419b 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -31,7 +31,10 @@
 #include <arrow-glib/error.hpp>
 #include <arrow-glib/file.hpp>
 #include <arrow-glib/input-stream.hpp>
+#include <arrow-glib/ipc-options.hpp>
 #include <arrow-glib/readable.hpp>
+#include <arrow-glib/record-batch.hpp>
+#include <arrow-glib/schema.hpp>
 #include <arrow-glib/tensor.hpp>
 
 #include <mutex>
@@ -300,6 +303,60 @@ garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
   }
 }
 
+/**
+ * garrow_input_stream_read_record_batch:
+ * @input_stream: A #GArrowInputStream.
+ * @schema: A #GArrowSchema for a read record batch.
+ * @options: (nullable): A #GArrowReadOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (transfer full) (nullable):
+ *   #GArrowRecordBatch on success, %NULL on error.
+ *
+ * Since: 1.0.0
+ */
+GArrowRecordBatch *
+garrow_input_stream_read_record_batch(GArrowInputStream *input_stream,
+                                      GArrowSchema *schema,
+                                      GArrowReadOptions *options,
+                                      GError **error)
+{
+  auto arrow_input_stream = garrow_input_stream_get_raw(input_stream);
+  auto arrow_schema = garrow_schema_get_raw(schema);
+
+  if (options) {
+    auto arrow_options = garrow_read_options_get_raw(options);
+    auto arrow_dictionary_memo =
+      garrow_read_options_get_dictionary_memo_raw(options);
+    auto arrow_record_batch =
+      arrow::ipc::ReadRecordBatch(arrow_schema,
+                                  arrow_dictionary_memo,
+                                  *arrow_options,
+                                  arrow_input_stream.get());
+    if (garrow::check(error,
+                      arrow_record_batch,
+                      "[input-stream][read-record-batch]")) {
+      return garrow_record_batch_new_raw(&(*arrow_record_batch));
+    } else {
+      return NULL;
+    }
+  } else {
+    auto arrow_options = arrow::ipc::IpcReadOptions::Defaults();
+    auto arrow_record_batch =
+      arrow::ipc::ReadRecordBatch(arrow_schema,
+                                  nullptr,
+                                  arrow_options,
+                                  arrow_input_stream.get());
+    if (garrow::check(error,
+                      arrow_record_batch,
+                      "[input-stream][read-record-batch]")) {
+      return garrow_record_batch_new_raw(&(*arrow_record_batch));
+    } else {
+      return NULL;
+    }
+  }
+}
+
 
 G_DEFINE_TYPE(GArrowSeekableInputStream,
               garrow_seekable_input_stream,
diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h
index 86b30cb..4b4c51e 100644
--- a/c_glib/arrow-glib/input-stream.h
+++ b/c_glib/arrow-glib/input-stream.h
@@ -23,6 +23,8 @@
 
 #include <arrow-glib/buffer.h>
 #include <arrow-glib/codec.h>
+#include <arrow-glib/ipc-options.h>
+#include <arrow-glib/record-batch.h>
 #include <arrow-glib/tensor.h>
 
 G_BEGIN_DECLS
@@ -46,6 +48,12 @@ gboolean garrow_input_stream_align(GArrowInputStream *input_stream,
                                    GError **error);
 GArrowTensor *garrow_input_stream_read_tensor(GArrowInputStream *input_stream,
                                               GError **error);
+GARROW_AVAILABLE_IN_1_0
+GArrowRecordBatch *
+garrow_input_stream_read_record_batch(GArrowInputStream *input_stream,
+                                      GArrowSchema *schema,
+                                      GArrowReadOptions *options,
+                                      GError **error);
 
 #define GARROW_TYPE_SEEKABLE_INPUT_STREAM       \
   (garrow_seekable_input_stream_get_type())
diff --git a/c_glib/arrow-glib/ipc-options.cpp b/c_glib/arrow-glib/ipc-options.cpp
index 4d549ac..1cddd25 100644
--- a/c_glib/arrow-glib/ipc-options.cpp
+++ b/c_glib/arrow-glib/ipc-options.cpp
@@ -22,9 +22,7 @@
 #endif
 
 #include <arrow-glib/enums.h>
-#include <arrow-glib/ipc-options.h>
-
-#include <arrow/ipc/api.h>
+#include <arrow-glib/ipc-options.hpp>
 
 G_BEGIN_DECLS
 
@@ -41,6 +39,7 @@ G_BEGIN_DECLS
 
 typedef struct GArrowReadOptionsPrivate_ {
   arrow::ipc::IpcReadOptions options;
+  arrow::ipc::DictionaryMemo dictionary_memo;
 } GArrowReadOptionsPrivate;
 
 enum {
@@ -63,6 +62,7 @@ garrow_read_options_finalize(GObject *object)
   auto priv = GARROW_READ_OPTIONS_GET_PRIVATE(object);
 
   priv->options.~IpcReadOptions();
+  priv->dictionary_memo.~DictionaryMemo();
 
   G_OBJECT_CLASS(garrow_read_options_parent_class)->finalize(object);
 }
@@ -115,6 +115,7 @@ garrow_read_options_init(GArrowReadOptions *object)
   auto priv = GARROW_READ_OPTIONS_GET_PRIVATE(object);
   new(&priv->options) arrow::ipc::IpcReadOptions;
   priv->options = arrow::ipc::IpcReadOptions::Defaults();
+  new(&priv->dictionary_memo) arrow::ipc::DictionaryMemo;
 }
 
 static void
@@ -510,3 +511,24 @@ garrow_write_options_new(void)
 }
 
 G_END_DECLS
+
+arrow::ipc::IpcReadOptions *
+garrow_read_options_get_raw(GArrowReadOptions *options)
+{
+  auto priv = GARROW_READ_OPTIONS_GET_PRIVATE(options);
+  return &(priv->options);
+}
+
+arrow::ipc::DictionaryMemo *
+garrow_read_options_get_dictionary_memo_raw(GArrowReadOptions *options)
+{
+  auto priv = GARROW_READ_OPTIONS_GET_PRIVATE(options);
+  return &(priv->dictionary_memo);
+}
+
+arrow::ipc::IpcWriteOptions *
+garrow_write_options_get_raw(GArrowWriteOptions *options)
+{
+  auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(options);
+  return &(priv->options);
+}
diff --git a/c_glib/arrow-glib/ipc-options.hpp b/c_glib/arrow-glib/ipc-options.hpp
new file mode 100644
index 0000000..f57fbd3
--- /dev/null
+++ b/c_glib/arrow-glib/ipc-options.hpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <arrow/ipc/api.h>
+
+#include <arrow-glib/ipc-options.h>
+
+arrow::ipc::IpcReadOptions *
+garrow_read_options_get_raw(GArrowReadOptions *options);
+arrow::ipc::DictionaryMemo *
+garrow_read_options_get_dictionary_memo_raw(GArrowReadOptions *options);
+
+arrow::ipc::IpcWriteOptions *
+garrow_write_options_get_raw(GArrowWriteOptions *options);
diff --git a/c_glib/arrow-glib/meson.build b/c_glib/arrow-glib/meson.build
index db84538..1316de4 100644
--- a/c_glib/arrow-glib/meson.build
+++ b/c_glib/arrow-glib/meson.build
@@ -159,6 +159,7 @@ cpp_headers += files(
 )
 
 cpp_headers += files(
+  'ipc-options.hpp',
   'metadata-version.hpp',
   'reader.hpp',
   'writer.hpp',
diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp
index e9b5f8b..1653332 100644
--- a/c_glib/arrow-glib/output-stream.cpp
+++ b/c_glib/arrow-glib/output-stream.cpp
@@ -28,7 +28,9 @@
 #include <arrow-glib/codec.hpp>
 #include <arrow-glib/error.hpp>
 #include <arrow-glib/file.hpp>
+#include <arrow-glib/ipc-options.hpp>
 #include <arrow-glib/output-stream.hpp>
+#include <arrow-glib/record-batch.hpp>
 #include <arrow-glib/tensor.hpp>
 #include <arrow-glib/writable.hpp>
 
@@ -177,7 +179,7 @@ garrow_output_stream_class_init(GArrowOutputStreamClass *klass)
 
 /**
  * garrow_output_stream_align:
- * @stream: A #GArrowWritable.
+ * @stream: A #GArrowOutputStream.
  * @alignment: The byte multiple for the metadata prefix, usually 8
  *   or 64, to ensure the body starts on a multiple of that alignment.
  * @error: (nullable): Return location for a #GError or %NULL.
@@ -193,12 +195,12 @@ garrow_output_stream_align(GArrowOutputStream *stream,
 {
   auto arrow_stream = garrow_output_stream_get_raw(stream);
   auto status = arrow::ipc::AlignStream(arrow_stream.get(), alignment);
-  return garrow_error_check(error, status, "[output-stream][align]");
+  return garrow::check(error, status, "[output-stream][align]");
 }
 
 /**
  * garrow_output_stream_write_tensor:
- * @stream: A #GArrowWritable.
+ * @stream: A #GArrowOutputStream.
  * @tensor: A #GArrowTensor to be written.
  * @error: (nullable): Return location for a #GError or %NULL.
  *
@@ -211,15 +213,62 @@ garrow_output_stream_write_tensor(GArrowOutputStream *stream,
                                   GArrowTensor *tensor,
                                   GError **error)
 {
-  auto arrow_tensor = garrow_tensor_get_raw(tensor);
   auto arrow_stream = garrow_output_stream_get_raw(stream);
+  auto arrow_tensor = garrow_tensor_get_raw(tensor);
   int32_t metadata_length;
   int64_t body_length;
   auto status = arrow::ipc::WriteTensor(*arrow_tensor,
                                         arrow_stream.get(),
                                         &metadata_length,
                                         &body_length);
-  if (garrow_error_check(error, status, "[output-stream][write-tensor]")) {
+  if (garrow::check(error, status, "[output-stream][write-tensor]")) {
+    return metadata_length + body_length;
+  } else {
+    return -1;
+  }
+}
+
+/**
+ * garrow_output_stream_write_record_batch:
+ * @stream: A #GArrowOutputStream.
+ * @record_batch: A #GArrowRecordBatch to be written.
+ * @options: (nullable): A #GArrowWriteOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: The number of written bytes on success, -1 on error.
+ *
+ * Since: 1.0.0
+ */
+gint64
+garrow_output_stream_write_record_batch(GArrowOutputStream *stream,
+                                        GArrowRecordBatch *record_batch,
+                                        GArrowWriteOptions *options,
+                                        GError **error)
+{
+  auto arrow_stream = garrow_output_stream_get_raw(stream);
+  auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
+  int64_t buffer_start_offset = 0;
+  int32_t metadata_length;
+  int64_t body_length;
+  arrow::Status status;
+  if (options) {
+    auto arrow_options = garrow_write_options_get_raw(options);
+    status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
+                                          buffer_start_offset,
+                                          arrow_stream.get(),
+                                          &metadata_length,
+                                          &body_length,
+                                          *arrow_options);
+  } else {
+    auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults();
+    status = arrow::ipc::WriteRecordBatch(*arrow_record_batch,
+                                          buffer_start_offset,
+                                          arrow_stream.get(),
+                                          &metadata_length,
+                                          &body_length,
+                                          arrow_options);
+  }
+  if (garrow::check(error, status, "[output-stream][write-record-batch]")) {
     return metadata_length + body_length;
   } else {
     return -1;
diff --git a/c_glib/arrow-glib/output-stream.h b/c_glib/arrow-glib/output-stream.h
index bcfd818..eeef248 100644
--- a/c_glib/arrow-glib/output-stream.h
+++ b/c_glib/arrow-glib/output-stream.h
@@ -23,6 +23,8 @@
 
 #include <arrow-glib/buffer.h>
 #include <arrow-glib/codec.h>
+#include <arrow-glib/ipc-options.h>
+#include <arrow-glib/record-batch.h>
 #include <arrow-glib/tensor.h>
 
 G_BEGIN_DECLS
@@ -44,6 +46,12 @@ gboolean garrow_output_stream_align(GArrowOutputStream *stream,
 gint64 garrow_output_stream_write_tensor(GArrowOutputStream *stream,
                                          GArrowTensor *tensor,
                                          GError **error);
+GARROW_AVAILABLE_IN_1_0
+gint64
+garrow_output_stream_write_record_batch(GArrowOutputStream *stream,
+                                        GArrowRecordBatch *record_batch,
+                                        GArrowWriteOptions *options,
+                                        GError **error);
 
 
 #define GARROW_TYPE_FILE_OUTPUT_STREAM          \
diff --git a/c_glib/arrow-glib/record-batch.cpp b/c_glib/arrow-glib/record-batch.cpp
index 2b09b80..9e5f892 100644
--- a/c_glib/arrow-glib/record-batch.cpp
+++ b/c_glib/arrow-glib/record-batch.cpp
@@ -22,9 +22,11 @@
 #endif
 
 #include <arrow-glib/array.hpp>
+#include <arrow-glib/buffer.hpp>
 #include <arrow-glib/error.hpp>
 #include <arrow-glib/field.hpp>
 #include <arrow-glib/internal-index.hpp>
+#include <arrow-glib/ipc-options.hpp>
 #include <arrow-glib/record-batch.hpp>
 #include <arrow-glib/schema.hpp>
 
@@ -367,9 +369,12 @@ garrow_record_batch_add_column(GArrowRecordBatch *record_batch,
   const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
   const auto arrow_field = garrow_field_get_raw(field);
   const auto arrow_column = garrow_array_get_raw(column);
-  auto maybe_new_batch = arrow_record_batch->AddColumn(i, arrow_field, arrow_column);
-  if (garrow::check(error, maybe_new_batch, "[record-batch][add-column]")) {
-    return garrow_record_batch_new_raw(&(*maybe_new_batch));
+  auto arrow_new_record_batch =
+    arrow_record_batch->AddColumn(i, arrow_field, arrow_column);
+  if (garrow::check(error,
+                    arrow_new_record_batch,
+                    "[record-batch][add-column]")) {
+    return garrow_record_batch_new_raw(&(*arrow_new_record_batch));
   } else {
     return NULL;
   }
@@ -392,14 +397,56 @@ garrow_record_batch_remove_column(GArrowRecordBatch *record_batch,
                                   GError **error)
 {
   const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
-  auto maybe_new_batch = arrow_record_batch->RemoveColumn(i);
-  if (garrow::check(error, maybe_new_batch, "[record-batch][remove-column]")) {
-    return garrow_record_batch_new_raw(&(*maybe_new_batch));
+  auto arrow_new_record_batch = arrow_record_batch->RemoveColumn(i);
+  if (garrow::check(error,
+                    arrow_new_record_batch,
+                    "[record-batch][remove-column]")) {
+    return garrow_record_batch_new_raw(&(*arrow_new_record_batch));
   } else {
     return NULL;
   }
 }
 
+/**
+ * garrow_record_batch_serialize:
+ * @record_batch: A #GArrowRecordBatch.
+ * @options: (nullable): A #GArrowWriteOptions.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable) (transfer full): The newly allocated
+ *   #GArrowBuffer that contains a serialized record batch or %NULL on
+ *   error.
+ *
+ * Since: 1.0.0
+ */
+GArrowBuffer *
+garrow_record_batch_serialize(GArrowRecordBatch *record_batch,
+                              GArrowWriteOptions *options,
+                              GError **error)
+{
+  const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
+  arrow::Result<std::shared_ptr<arrow::Buffer>> arrow_buffer;
+  if (options) {
+    auto arrow_options = garrow_write_options_get_raw(options);
+    auto arrow_buffer = arrow::ipc::SerializeRecordBatch(*arrow_record_batch,
+                                                         *arrow_options);
+    if (garrow::check(error, arrow_buffer, "[record-batch][serialize]")) {
+      return garrow_buffer_new_raw(&(*arrow_buffer));
+    } else {
+      return NULL;
+    }
+  } else {
+    const auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults();
+    auto arrow_buffer = arrow::ipc::SerializeRecordBatch(*arrow_record_batch,
+                                                         arrow_options);
+    if (garrow::check(error, arrow_buffer, "[record-batch][serialize]")) {
+      return garrow_buffer_new_raw(&(*arrow_buffer));
+    } else {
+      return NULL;
+    }
+  }
+}
+
 
 typedef struct GArrowRecordBatchIteratorPrivate_ {
   arrow::RecordBatchIterator iterator;
diff --git a/c_glib/arrow-glib/record-batch.h b/c_glib/arrow-glib/record-batch.h
index 58c1da4..e3acdec 100644
--- a/c_glib/arrow-glib/record-batch.h
+++ b/c_glib/arrow-glib/record-batch.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <arrow-glib/array.h>
+#include <arrow-glib/ipc-options.h>
 #include <arrow-glib/schema.h>
 
 G_BEGIN_DECLS
@@ -70,6 +71,11 @@ GArrowRecordBatch *garrow_record_batch_add_column(GArrowRecordBatch *record_batc
 GArrowRecordBatch *garrow_record_batch_remove_column(GArrowRecordBatch *record_batch,
                                                      guint i,
                                                      GError **error);
+GARROW_AVAILABLE_IN_1_0
+GArrowBuffer *
+garrow_record_batch_serialize(GArrowRecordBatch *record_batch,
+                              GArrowWriteOptions *options,
+                              GError **error);
 
 
 #define GARROW_TYPE_RECORD_BATCH_ITERATOR       \
@@ -99,7 +105,7 @@ garrow_record_batch_iterator_equal(GArrowRecordBatchIterator *iterator,
                                    GArrowRecordBatchIterator *other_iterator);
 
 GARROW_AVAILABLE_IN_0_17
-GList*
+GList *
 garrow_record_batch_iterator_to_list(GArrowRecordBatchIterator *iterator,
                                      GError **error);
 
diff --git a/c_glib/test/test-buffer-input-stream.rb b/c_glib/test/test-buffer-input-stream.rb
index b0c1ee5..e31ea38 100644
--- a/c_glib/test/test-buffer-input-stream.rb
+++ b/c_glib/test/test-buffer-input-stream.rb
@@ -16,6 +16,8 @@
 # under the License.
 
 class TestBufferInputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_read
     buffer = Arrow::Buffer.new("Hello World")
     buffer_input_stream = Arrow::BufferInputStream.new(buffer)
@@ -83,4 +85,27 @@ class TestBufferInputStream < Test::Unit::TestCase
     assert_equal(data.encode(convert_encoding),
                  raw_read_data.dup.force_encoding(convert_encoding))
   end
+
+  def test_read_record_batch
+    fields = [
+      Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+      Arrow::Field.new("valid", Arrow::BooleanDataType.new),
+    ]
+    schema = Arrow::Schema.new(fields)
+    columns = [
+      build_boolean_array([true]),
+      build_boolean_array([false]),
+    ]
+    record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+
+    buffer = Arrow::ResizableBuffer.new(0)
+    output_stream = Arrow::BufferOutputStream.new(buffer)
+    output_stream.write_record_batch(record_batch)
+    output_stream.close
+
+    input_stream = Arrow::BufferInputStream.new(buffer)
+    options = Arrow::ReadOptions.new
+    assert_equal(record_batch,
+                 input_stream.read_record_batch(schema, options))
+  end
 end
diff --git a/c_glib/test/test-buffer-output-stream.rb b/c_glib/test/test-buffer-output-stream.rb
index b68b60e..9866762 100644
--- a/c_glib/test/test-buffer-output-stream.rb
+++ b/c_glib/test/test-buffer-output-stream.rb
@@ -16,6 +16,8 @@
 # under the License.
 
 class TestBufferOutputStream < Test::Unit::TestCase
+  include Helper::Buildable
+
   def test_new
     buffer = Arrow::ResizableBuffer.new(0)
     output_stream = Arrow::BufferOutputStream.new(buffer)
@@ -32,4 +34,27 @@ class TestBufferOutputStream < Test::Unit::TestCase
     output_stream.close
     assert_equal("Hello\x00\x00\x00", buffer.data.to_s)
   end
+
+  def test_write_record_batch
+    fields = [
+      Arrow::Field.new("visible", Arrow::BooleanDataType.new),
+      Arrow::Field.new("valid", Arrow::BooleanDataType.new),
+    ]
+    schema = Arrow::Schema.new(fields)
+    columns = [
+      build_boolean_array([true]),
+      build_boolean_array([false]),
+    ]
+    record_batch = Arrow::RecordBatch.new(schema, 1, columns)
+
+    buffer = Arrow::ResizableBuffer.new(0)
+    options = Arrow::WriteOptions.new
+    output_stream = Arrow::BufferOutputStream.new(buffer)
+    output_stream.write_record_batch(record_batch, options)
+    output_stream.close
+
+    input_stream = Arrow::BufferInputStream.new(buffer)
+    assert_equal(record_batch,
+                 input_stream.read_record_batch(schema))
+  end
 end
diff --git a/c_glib/test/test-record-batch.rb b/c_glib/test/test-record-batch.rb
index 6520a30..8bd6aab 100644
--- a/c_glib/test/test-record-batch.rb
+++ b/c_glib/test/test-record-batch.rb
@@ -174,5 +174,12 @@ valid:   [
       assert_equal(["valid"],
                    new_record_batch.schema.fields.collect(&:name))
     end
+
+    def test_serialize
+      buffer = @record_batch.serialize
+      input_stream = Arrow::BufferInputStream.new(buffer)
+      assert_equal(@record_batch,
+                   input_stream.read_record_batch(@record_batch.schema))
+    end
   end
 end