You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/06/27 14:47:22 UTC

[arrow] branch master updated: ARROW-3732 [R] Add functions to write RecordBatch or Schema to Message value, then read back

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ae6a58  ARROW-3732 [R] Add functions to write RecordBatch or Schema to Message value, then read back
7ae6a58 is described below

commit 7ae6a58d16eae82fb408608ba68a8c661ce239e8
Author: Romain Francois <ro...@rstudio.com>
AuthorDate: Thu Jun 27 10:47:08 2019 -0400

    ARROW-3732 [R] Add functions to write RecordBatch or Schema to Message value, then read back
    
    ``` r
    library(arrow, warn.conflicts = FALSE)
    
    s <- schema(x = int32())
    bytes <- s$serialize()
    msg <- read_message(bytes)
    
    s2 <- read_schema(msg)
    s == s2
    #> [1] TRUE
    
    b <- record_batch(x = 1:10)
    bytes <- b$serialize()
    msg <- read_message(bytes)
    b2 <- read_record_batch(msg, s)
    b == b2
    #> [1] TRUE
    ```
    
    Author: Romain Francois <ro...@rstudio.com>
    
    Closes #4673 from romainfrancois/ARROW-3732/Schema_serialize and squashes the following commits:
    
    395f6b5f8 <Romain Francois> + Schema$Equals()
    4d3fe841e <Romain Francois> + read_schema(<Message>)
    899cd4720 <Romain Francois> read_message() handles raw vectors (and anything that BufferReader() can handle
    b0b910e75 <Romain Francois> + Schema$serialize() and additional tests in read_message()
    666bd0efc <Romain Francois> move implementation of Schema__*() in schema.cpp
---
 r/NAMESPACE                           |   3 +
 r/R/Schema.R                          |  14 ++-
 r/R/arrowExports.R                    |  52 +++++----
 r/R/message.R                         |   7 +-
 r/src/arrowExports.cpp                | 212 +++++++++++++++++++++-------------
 r/src/datatype.cpp                    |  33 ------
 r/src/message.cpp                     |   9 ++
 r/src/schema.cpp                      |  75 ++++++++++++
 r/tests/testthat/test-message.R       |  31 +++++
 r/tests/testthat/test-messagereader.R |  32 +++++
 10 files changed, 332 insertions(+), 136 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index 520f1b0..e82b30a 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -5,6 +5,7 @@ S3method("==","arrow::Array")
 S3method("==","arrow::DataType")
 S3method("==","arrow::Field")
 S3method("==","arrow::RecordBatch")
+S3method("==","arrow::Schema")
 S3method("==","arrow::ipc::Message")
 S3method(BufferReader,"arrow::Buffer")
 S3method(BufferReader,default)
@@ -67,12 +68,14 @@ S3method(parquet_file_reader,fs_path)
 S3method(print,"arrow-enum")
 S3method(read_message,"arrow::io::InputStream")
 S3method(read_message,"arrow::ipc::MessageReader")
+S3method(read_message,default)
 S3method(read_record_batch,"arrow::Buffer")
 S3method(read_record_batch,"arrow::io::InputStream")
 S3method(read_record_batch,"arrow::ipc::Message")
 S3method(read_record_batch,raw)
 S3method(read_schema,"arrow::Buffer")
 S3method(read_schema,"arrow::io::InputStream")
+S3method(read_schema,"arrow::ipc::Message")
 S3method(read_schema,raw)
 S3method(read_table,"arrow::ipc::RecordBatchFileReader")
 S3method(read_table,"arrow::ipc::RecordBatchStreamReader")
diff --git a/r/R/Schema.R b/r/R/Schema.R
index fbf6581..0d47196 100644
--- a/r/R/Schema.R
+++ b/r/R/Schema.R
@@ -46,13 +46,20 @@
   public = list(
     ToString = function() Schema__ToString(self),
     num_fields = function() Schema__num_fields(self),
-    field = function(i) shared_ptr(`arrow::Field`, Schema__field(self, i))
+    field = function(i) shared_ptr(`arrow::Field`, Schema__field(self, i)),
+    serialize = function() Schema__serialize(self),
+    Equals = function(other, check_metadata = TRUE) Schema__Equals(self, other, isTRUE(check_metadata))
   ),
   active = list(
     names = function() Schema__names(self)
   )
 )
 
+#' @export
+`==.arrow::Schema` <- function(lhs, rhs){
+  lhs$Equals(rhs)
+}
+
 #' Schema factory
 #'
 #' @param ... named list of data types
@@ -92,3 +99,8 @@ read_schema <- function(stream, ...) {
   on.exit(stream$close())
   shared_ptr(`arrow::Schema`, ipc___ReadSchema_InputStream(stream))
 }
+
+#' @export
+`read_schema.arrow::ipc::Message` <- function(stream, ...) {
+  shared_ptr(`arrow::Schema`, ipc___ReadSchema_Message(stream))
+}
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 06efab1..951b83b 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -400,26 +400,6 @@ DataType__id <- function(type){
     .Call(`_arrow_DataType__id` , type)
 }
 
-schema_ <- function(fields){
-    .Call(`_arrow_schema_` , fields)
-}
-
-Schema__ToString <- function(s){
-    .Call(`_arrow_Schema__ToString` , s)
-}
-
-Schema__num_fields <- function(s){
-    .Call(`_arrow_Schema__num_fields` , s)
-}
-
-Schema__field <- function(s, i){
-    .Call(`_arrow_Schema__field` , s, i)
-}
-
-Schema__names <- function(schema){
-    .Call(`_arrow_Schema__names` , schema)
-}
-
 ListType__ToString <- function(type){
     .Call(`_arrow_ListType__ToString` , type)
 }
@@ -736,6 +716,10 @@ ipc___ReadSchema_InputStream <- function(stream){
     .Call(`_arrow_ipc___ReadSchema_InputStream` , stream)
 }
 
+ipc___ReadSchema_Message <- function(message){
+    .Call(`_arrow_ipc___ReadSchema_Message` , message)
+}
+
 ipc___MessageReader__Open <- function(stream){
     .Call(`_arrow_ipc___MessageReader__Open` , stream)
 }
@@ -916,6 +900,34 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema){
     .Call(`_arrow_ipc___RecordBatchStreamWriter__Open` , stream, schema)
 }
 
+schema_ <- function(fields){
+    .Call(`_arrow_schema_` , fields)
+}
+
+Schema__ToString <- function(s){
+    .Call(`_arrow_Schema__ToString` , s)
+}
+
+Schema__num_fields <- function(s){
+    .Call(`_arrow_Schema__num_fields` , s)
+}
+
+Schema__field <- function(s, i){
+    .Call(`_arrow_Schema__field` , s, i)
+}
+
+Schema__names <- function(schema){
+    .Call(`_arrow_Schema__names` , schema)
+}
+
+Schema__serialize <- function(schema){
+    .Call(`_arrow_Schema__serialize` , schema)
+}
+
+Schema__Equals <- function(schema, other, check_metadata){
+    .Call(`_arrow_Schema__Equals` , schema, other, check_metadata)
+}
+
 Table__from_dataframe <- function(tbl){
     .Call(`_arrow_Table__from_dataframe` , tbl)
 }
diff --git a/r/R/message.R b/r/R/message.R
index 93c90c0..98d9248 100644
--- a/r/R/message.R
+++ b/r/R/message.R
@@ -32,7 +32,7 @@
 `arrow::ipc::Message` <- R6Class("arrow::ipc::Message", inherit = `arrow::Object`,
   public = list(
     Equals = function(other){
-      assert_that(inherits(other), "arrow::ipc::Message")
+      assert_that(inherits(other, "arrow::ipc::Message"))
       ipc___Message__Equals(self, other)
     },
     body_length = function() ipc___Message__body_length(self),
@@ -95,6 +95,11 @@ read_message <- function(stream) {
 }
 
 #' @export
+read_message.default<- function(stream) {
+  read_message(BufferReader(stream))
+}
+
+#' @export
 `read_message.arrow::io::InputStream` <- function(stream) {
   unique_ptr(`arrow::ipc::Message`, ipc___ReadMessage(stream) )
 }
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d2962fa..356f9ab 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1532,82 +1532,6 @@ RcppExport SEXP _arrow_DataType__id(SEXP type_sexp){
 
 // datatype.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> schema_(List fields);
-RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<List>::type fields(fields_sexp);
-	return Rcpp::wrap(schema_(fields));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
-	Rf_error("Cannot call schema_(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s);
-RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
-	return Rcpp::wrap(Schema__ToString(s));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
-	Rf_error("Cannot call Schema__ToString(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s);
-RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
-	return Rcpp::wrap(Schema__num_fields(s));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
-	Rf_error("Cannot call Schema__num_fields(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s, int i);
-RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
-	Rcpp::traits::input_parameter<int>::type i(i_sexp);
-	return Rcpp::wrap(Schema__field(s, i));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
-	Rf_error("Cannot call Schema__field(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema);
-RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
-BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
-	return Rcpp::wrap(Schema__names(schema));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
-	Rf_error("Cannot call Schema__names(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
 std::string ListType__ToString(const std::shared_ptr<arrow::ListType>& type);
 RcppExport SEXP _arrow_ListType__ToString(SEXP type_sexp){
 BEGIN_RCPP
@@ -2830,6 +2754,21 @@ RcppExport SEXP _arrow_ipc___ReadSchema_InputStream(SEXP stream_sexp){
 
 // message.cpp
 #if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(const std::unique_ptr<arrow::ipc::Message>& message);
+RcppExport SEXP _arrow_ipc___ReadSchema_Message(SEXP message_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::unique_ptr<arrow::ipc::Message>&>::type message(message_sexp);
+	return Rcpp::wrap(ipc___ReadSchema_Message(message));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_ipc___ReadSchema_Message(SEXP message_sexp){
+	Rf_error("Cannot call ipc___ReadSchema_Message(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// message.cpp
+#if defined(ARROW_R_WITH_ARROW)
 std::unique_ptr<arrow::ipc::MessageReader> ipc___MessageReader__Open(const std::shared_ptr<arrow::io::InputStream>& stream);
 RcppExport SEXP _arrow_ipc___MessageReader__Open(SEXP stream_sexp){
 BEGIN_RCPP
@@ -3532,6 +3471,114 @@ RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX
 }
 #endif
 
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> schema_(Rcpp::List fields);
+RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<Rcpp::List>::type fields(fields_sexp);
+	return Rcpp::wrap(schema_(fields));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
+	Rf_error("Cannot call schema_(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s);
+RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+	return Rcpp::wrap(Schema__ToString(s));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
+	Rf_error("Cannot call Schema__ToString(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s);
+RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+	return Rcpp::wrap(Schema__num_fields(s));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
+	Rf_error("Cannot call Schema__num_fields(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s, int i);
+RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+	Rcpp::traits::input_parameter<int>::type i(i_sexp);
+	return Rcpp::wrap(Schema__field(s, i));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
+	Rf_error("Cannot call Schema__field(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+Rcpp::CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+	return Rcpp::wrap(Schema__names(schema));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
+	Rf_error("Cannot call Schema__names(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+Rcpp::RawVector Schema__serialize(const std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_Schema__serialize(SEXP schema_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+	return Rcpp::wrap(Schema__serialize(schema));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__serialize(SEXP schema_sexp){
+	Rf_error("Cannot call Schema__serialize(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+bool Schema__Equals(const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::Schema>& other, bool check_metadata);
+RcppExport SEXP _arrow_Schema__Equals(SEXP schema_sexp, SEXP other_sexp, SEXP check_metadata_sexp){
+BEGIN_RCPP
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type other(other_sexp);
+	Rcpp::traits::input_parameter<bool>::type check_metadata(check_metadata_sexp);
+	return Rcpp::wrap(Schema__Equals(schema, other, check_metadata));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__Equals(SEXP schema_sexp, SEXP other_sexp, SEXP check_metadata_sexp){
+	Rf_error("Cannot call Schema__Equals(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
 // table.cpp
 #if defined(ARROW_R_WITH_ARROW)
 std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
@@ -3813,11 +3860,6 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_DataType__num_children", (DL_FUNC) &_arrow_DataType__num_children, 1}, 
 		{ "_arrow_DataType__children_pointer", (DL_FUNC) &_arrow_DataType__children_pointer, 1}, 
 		{ "_arrow_DataType__id", (DL_FUNC) &_arrow_DataType__id, 1}, 
-		{ "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1}, 
-		{ "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1}, 
-		{ "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1}, 
-		{ "_arrow_Schema__field", (DL_FUNC) &_arrow_Schema__field, 2}, 
-		{ "_arrow_Schema__names", (DL_FUNC) &_arrow_Schema__names, 1}, 
 		{ "_arrow_ListType__ToString", (DL_FUNC) &_arrow_ListType__ToString, 1}, 
 		{ "_arrow_FixedWidthType__bit_width", (DL_FUNC) &_arrow_FixedWidthType__bit_width, 1}, 
 		{ "_arrow_DateType__unit", (DL_FUNC) &_arrow_DateType__unit, 1}, 
@@ -3897,6 +3939,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ipc___Message__Equals", (DL_FUNC) &_arrow_ipc___Message__Equals, 2}, 
 		{ "_arrow_ipc___ReadRecordBatch__Message__Schema", (DL_FUNC) &_arrow_ipc___ReadRecordBatch__Message__Schema, 2}, 
 		{ "_arrow_ipc___ReadSchema_InputStream", (DL_FUNC) &_arrow_ipc___ReadSchema_InputStream, 1}, 
+		{ "_arrow_ipc___ReadSchema_Message", (DL_FUNC) &_arrow_ipc___ReadSchema_Message, 1}, 
 		{ "_arrow_ipc___MessageReader__Open", (DL_FUNC) &_arrow_ipc___MessageReader__Open, 1}, 
 		{ "_arrow_ipc___MessageReader__ReadNextMessage", (DL_FUNC) &_arrow_ipc___MessageReader__ReadNextMessage, 1}, 
 		{ "_arrow_ipc___ReadMessage", (DL_FUNC) &_arrow_ipc___ReadMessage, 1}, 
@@ -3942,6 +3985,13 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, 
 		{ "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, 
 		{ "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, 
+		{ "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1}, 
+		{ "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1}, 
+		{ "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1}, 
+		{ "_arrow_Schema__field", (DL_FUNC) &_arrow_Schema__field, 2}, 
+		{ "_arrow_Schema__names", (DL_FUNC) &_arrow_Schema__names, 1}, 
+		{ "_arrow_Schema__serialize", (DL_FUNC) &_arrow_Schema__serialize, 1}, 
+		{ "_arrow_Schema__Equals", (DL_FUNC) &_arrow_Schema__Equals, 3}, 
 		{ "_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1}, 
 		{ "_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, 
 		{ "_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, 
diff --git a/r/src/datatype.cpp b/r/src/datatype.cpp
index f4a4b09..f625c89 100644
--- a/r/src/datatype.cpp
+++ b/r/src/datatype.cpp
@@ -167,39 +167,6 @@ arrow::Type::type DataType__id(const std::shared_ptr<arrow::DataType>& type) {
 }
 
 // [[arrow::export]]
-std::shared_ptr<arrow::Schema> schema_(List fields) {
-  return arrow::schema(arrow::r::List_to_shared_ptr_vector<arrow::Field>(fields));
-}
-
-// [[arrow::export]]
-std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s) {
-  return s->ToString();
-}
-
-// [[arrow::export]]
-int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s) {
-  return s->num_fields();
-}
-
-// [[arrow::export]]
-std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s,
-                                            int i) {
-  if (i >= s->num_fields() || i < 0) {
-    Rcpp::stop("Invalid field index for schema.");
-  }
-
-  return s->field(i);
-}
-
-// [[arrow::export]]
-CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema) {
-  auto fields = schema->fields();
-  return CharacterVector(
-      fields.begin(), fields.end(),
-      [](const std::shared_ptr<arrow::Field>& field) { return field->name(); });
-}
-
-// [[arrow::export]]
 std::string ListType__ToString(const std::shared_ptr<arrow::ListType>& type) {
   return type->ToString();
 }
diff --git a/r/src/message.cpp b/r/src/message.cpp
index 2769458..1726eb2 100644
--- a/r/src/message.cpp
+++ b/r/src/message.cpp
@@ -75,6 +75,15 @@ std::shared_ptr<arrow::Schema> ipc___ReadSchema_InputStream(
   return schema;
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(
+    const std::unique_ptr<arrow::ipc::Message>& message) {
+  std::shared_ptr<arrow::Schema> schema;
+  arrow::ipc::DictionaryMemo empty_memo;
+  STOP_IF_NOT_OK(arrow::ipc::ReadSchema(*message, &empty_memo, &schema));
+  return schema;
+}
+
 //--------- MessageReader
 
 // [[arrow::export]]
diff --git a/r/src/schema.cpp b/r/src/schema.cpp
new file mode 100644
index 0000000..7870043
--- /dev/null
+++ b/r/src/schema.cpp
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Schema> schema_(Rcpp::List fields) {
+  return arrow::schema(arrow::r::List_to_shared_ptr_vector<arrow::Field>(fields));
+}
+
+// [[arrow::export]]
+std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s) {
+  return s->ToString();
+}
+
+// [[arrow::export]]
+int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s) {
+  return s->num_fields();
+}
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s,
+                                            int i) {
+  if (i >= s->num_fields() || i < 0) {
+    Rcpp::stop("Invalid field index for schema.");
+  }
+
+  return s->field(i);
+}
+
+// [[arrow::export]]
+Rcpp::CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema) {
+  auto fields = schema->fields();
+  return Rcpp::CharacterVector(
+      fields.begin(), fields.end(),
+      [](const std::shared_ptr<arrow::Field>& field) { return field->name(); });
+}
+
+// [[arrow::export]]
+Rcpp::RawVector Schema__serialize(const std::shared_ptr<arrow::Schema>& schema) {
+  arrow::ipc::DictionaryMemo empty_memo;
+  std::shared_ptr<arrow::Buffer> out;
+  STOP_IF_NOT_OK(arrow::ipc::SerializeSchema(*schema, &empty_memo,
+                                             arrow::default_memory_pool(), &out));
+
+  auto n = out->size();
+  Rcpp::RawVector vec(out->size());
+  std::copy_n(out->data(), n, vec.begin());
+
+  return vec;
+}
+
+// [[arrow::export]]
+bool Schema__Equals(const std::shared_ptr<arrow::Schema>& schema,
+                    const std::shared_ptr<arrow::Schema>& other, bool check_metadata) {
+  return schema->Equals(*other, check_metadata);
+}
+
+#endif
diff --git a/r/tests/testthat/test-message.R b/r/tests/testthat/test-message.R
index 4cbf87d..5ddff01 100644
--- a/r/tests/testthat/test-message.R
+++ b/r/tests/testthat/test-message.R
@@ -31,3 +31,34 @@ test_that("read_message can read from input stream", {
   message <- read_message(stream)
   expect_null(read_message(stream))
 })
+
+test_that("read_message() can read Schema messages", {
+  bytes <- schema(x=int32())$serialize()
+  stream <- BufferReader(bytes)
+  message <- read_message(stream)
+
+  expect_is(message, "arrow::ipc::Message")
+  expect_equal(message$type, MessageType$SCHEMA)
+  expect_is(message$body, "arrow::Buffer")
+  expect_is(message$metadata, "arrow::Buffer")
+
+  message <- read_message(stream)
+  expect_null(read_message(stream))
+})
+
+test_that("read_message() can handle raw vectors", {
+  batch <- record_batch(x = 1:10)
+  bytes <- batch$serialize()
+  stream <- BufferReader(bytes)
+
+  message_stream <- read_message(stream)
+  message_raw <- read_message(bytes)
+  expect_equal(message_stream, message_raw)
+
+  bytes <- schema(x=int32())$serialize()
+  stream <- BufferReader(bytes)
+  message_stream <- read_message(stream)
+  message_raw <- read_message(bytes)
+
+  expect_equal(message_stream, message_raw)
+})
diff --git a/r/tests/testthat/test-messagereader.R b/r/tests/testthat/test-messagereader.R
index 690228d..c7260fe 100644
--- a/r/tests/testthat/test-messagereader.R
+++ b/r/tests/testthat/test-messagereader.R
@@ -31,6 +31,20 @@ test_that("MessageReader can be created from raw vectors", {
 
   message <- reader$ReadNextMessage()
   expect_null(message)
+
+  schema <- schema(x = int32())
+  bytes <- schema$serialize()
+
+  reader <- MessageReader(bytes)
+
+  message <- reader$ReadNextMessage()
+  expect_is(message, "arrow::ipc::Message")
+  expect_equal(message$type, MessageType$SCHEMA)
+  expect_is(message$body, "arrow::Buffer")
+  expect_is(message$metadata, "arrow::Buffer")
+
+  message <- reader$ReadNextMessage()
+  expect_null(message)
 })
 
 test_that("MessageReader can be created from input stream", {
@@ -51,4 +65,22 @@ test_that("MessageReader can be created from input stream", {
 
   message <- reader$ReadNextMessage()
   expect_null(message)
+
+  schema <- schema(x = int32())
+  bytes <- schema$serialize()
+
+  stream <- BufferReader(bytes)
+  expect_is(stream, "arrow::io::BufferReader")
+
+  reader <- MessageReader(stream)
+  expect_is(reader, "arrow::ipc::MessageReader")
+
+  message <- reader$ReadNextMessage()
+  expect_is(message, "arrow::ipc::Message")
+  expect_equal(message$type, MessageType$SCHEMA)
+  expect_is(message$body, "arrow::Buffer")
+  expect_is(message$metadata, "arrow::Buffer")
+
+  message <- reader$ReadNextMessage()
+  expect_null(message)
 })