You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/06/27 14:47:22 UTC
[arrow] branch master updated: ARROW-3732 [R] Add functions to
write RecordBatch or Schema to Message value, then read back
This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7ae6a58 ARROW-3732 [R] Add functions to write RecordBatch or Schema to Message value, then read back
7ae6a58 is described below
commit 7ae6a58d16eae82fb408608ba68a8c661ce239e8
Author: Romain Francois <ro...@rstudio.com>
AuthorDate: Thu Jun 27 10:47:08 2019 -0400
ARROW-3732 [R] Add functions to write RecordBatch or Schema to Message value, then read back
``` r
library(arrow, warn.conflicts = FALSE)
s <- schema(x = int32())
bytes <- s$serialize()
msg <- read_message(bytes)
s2 <- read_schema(msg)
s == s2
#> [1] TRUE
b <- record_batch(x = 1:10)
bytes <- b$serialize()
msg <- read_message(bytes)
b2 <- read_record_batch(msg, s)
b == b2
#> [1] TRUE
```
Author: Romain Francois <ro...@rstudio.com>
Closes #4673 from romainfrancois/ARROW-3732/Schema_serialize and squashes the following commits:
395f6b5f8 <Romain Francois> + Schema$Equals()
4d3fe841e <Romain Francois> + read_schema(<Message>)
899cd4720 <Romain Francois> read_message() handles raw vectors (and anything that BufferReader() can handle
b0b910e75 <Romain Francois> + Schema$serialize() and additional tests in read_message()
666bd0efc <Romain Francois> move implementation of Schema__*() in schema.cpp
---
r/NAMESPACE | 3 +
r/R/Schema.R | 14 ++-
r/R/arrowExports.R | 52 +++++----
r/R/message.R | 7 +-
r/src/arrowExports.cpp | 212 +++++++++++++++++++++-------------
r/src/datatype.cpp | 33 ------
r/src/message.cpp | 9 ++
r/src/schema.cpp | 75 ++++++++++++
r/tests/testthat/test-message.R | 31 +++++
r/tests/testthat/test-messagereader.R | 32 +++++
10 files changed, 332 insertions(+), 136 deletions(-)
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 520f1b0..e82b30a 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -5,6 +5,7 @@ S3method("==","arrow::Array")
S3method("==","arrow::DataType")
S3method("==","arrow::Field")
S3method("==","arrow::RecordBatch")
+S3method("==","arrow::Schema")
S3method("==","arrow::ipc::Message")
S3method(BufferReader,"arrow::Buffer")
S3method(BufferReader,default)
@@ -67,12 +68,14 @@ S3method(parquet_file_reader,fs_path)
S3method(print,"arrow-enum")
S3method(read_message,"arrow::io::InputStream")
S3method(read_message,"arrow::ipc::MessageReader")
+S3method(read_message,default)
S3method(read_record_batch,"arrow::Buffer")
S3method(read_record_batch,"arrow::io::InputStream")
S3method(read_record_batch,"arrow::ipc::Message")
S3method(read_record_batch,raw)
S3method(read_schema,"arrow::Buffer")
S3method(read_schema,"arrow::io::InputStream")
+S3method(read_schema,"arrow::ipc::Message")
S3method(read_schema,raw)
S3method(read_table,"arrow::ipc::RecordBatchFileReader")
S3method(read_table,"arrow::ipc::RecordBatchStreamReader")
diff --git a/r/R/Schema.R b/r/R/Schema.R
index fbf6581..0d47196 100644
--- a/r/R/Schema.R
+++ b/r/R/Schema.R
@@ -46,13 +46,20 @@
public = list(
ToString = function() Schema__ToString(self),
num_fields = function() Schema__num_fields(self),
- field = function(i) shared_ptr(`arrow::Field`, Schema__field(self, i))
+ field = function(i) shared_ptr(`arrow::Field`, Schema__field(self, i)),
+ serialize = function() Schema__serialize(self),
+ Equals = function(other, check_metadata = TRUE) Schema__Equals(self, other, isTRUE(check_metadata))
),
active = list(
names = function() Schema__names(self)
)
)
+#' @export
+`==.arrow::Schema` <- function(lhs, rhs){
+ lhs$Equals(rhs)
+}
+
#' Schema factory
#'
#' @param ... named list of data types
@@ -92,3 +99,8 @@ read_schema <- function(stream, ...) {
on.exit(stream$close())
shared_ptr(`arrow::Schema`, ipc___ReadSchema_InputStream(stream))
}
+
+#' @export
+`read_schema.arrow::ipc::Message` <- function(stream, ...) {
+ shared_ptr(`arrow::Schema`, ipc___ReadSchema_Message(stream))
+}
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 06efab1..951b83b 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -400,26 +400,6 @@ DataType__id <- function(type){
.Call(`_arrow_DataType__id` , type)
}
-schema_ <- function(fields){
- .Call(`_arrow_schema_` , fields)
-}
-
-Schema__ToString <- function(s){
- .Call(`_arrow_Schema__ToString` , s)
-}
-
-Schema__num_fields <- function(s){
- .Call(`_arrow_Schema__num_fields` , s)
-}
-
-Schema__field <- function(s, i){
- .Call(`_arrow_Schema__field` , s, i)
-}
-
-Schema__names <- function(schema){
- .Call(`_arrow_Schema__names` , schema)
-}
-
ListType__ToString <- function(type){
.Call(`_arrow_ListType__ToString` , type)
}
@@ -736,6 +716,10 @@ ipc___ReadSchema_InputStream <- function(stream){
.Call(`_arrow_ipc___ReadSchema_InputStream` , stream)
}
+ipc___ReadSchema_Message <- function(message){
+ .Call(`_arrow_ipc___ReadSchema_Message` , message)
+}
+
ipc___MessageReader__Open <- function(stream){
.Call(`_arrow_ipc___MessageReader__Open` , stream)
}
@@ -916,6 +900,34 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema){
.Call(`_arrow_ipc___RecordBatchStreamWriter__Open` , stream, schema)
}
+schema_ <- function(fields){
+ .Call(`_arrow_schema_` , fields)
+}
+
+Schema__ToString <- function(s){
+ .Call(`_arrow_Schema__ToString` , s)
+}
+
+Schema__num_fields <- function(s){
+ .Call(`_arrow_Schema__num_fields` , s)
+}
+
+Schema__field <- function(s, i){
+ .Call(`_arrow_Schema__field` , s, i)
+}
+
+Schema__names <- function(schema){
+ .Call(`_arrow_Schema__names` , schema)
+}
+
+Schema__serialize <- function(schema){
+ .Call(`_arrow_Schema__serialize` , schema)
+}
+
+Schema__Equals <- function(schema, other, check_metadata){
+ .Call(`_arrow_Schema__Equals` , schema, other, check_metadata)
+}
+
Table__from_dataframe <- function(tbl){
.Call(`_arrow_Table__from_dataframe` , tbl)
}
diff --git a/r/R/message.R b/r/R/message.R
index 93c90c0..98d9248 100644
--- a/r/R/message.R
+++ b/r/R/message.R
@@ -32,7 +32,7 @@
`arrow::ipc::Message` <- R6Class("arrow::ipc::Message", inherit = `arrow::Object`,
public = list(
Equals = function(other){
- assert_that(inherits(other), "arrow::ipc::Message")
+ assert_that(inherits(other, "arrow::ipc::Message"))
ipc___Message__Equals(self, other)
},
body_length = function() ipc___Message__body_length(self),
@@ -95,6 +95,11 @@ read_message <- function(stream) {
}
#' @export
+read_message.default<- function(stream) {
+ read_message(BufferReader(stream))
+}
+
+#' @export
`read_message.arrow::io::InputStream` <- function(stream) {
unique_ptr(`arrow::ipc::Message`, ipc___ReadMessage(stream) )
}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d2962fa..356f9ab 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1532,82 +1532,6 @@ RcppExport SEXP _arrow_DataType__id(SEXP type_sexp){
// datatype.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Schema> schema_(List fields);
-RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<List>::type fields(fields_sexp);
- return Rcpp::wrap(schema_(fields));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
- Rf_error("Cannot call schema_(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s);
-RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
- return Rcpp::wrap(Schema__ToString(s));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
- Rf_error("Cannot call Schema__ToString(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s);
-RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
- return Rcpp::wrap(Schema__num_fields(s));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
- Rf_error("Cannot call Schema__num_fields(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s, int i);
-RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
- Rcpp::traits::input_parameter<int>::type i(i_sexp);
- return Rcpp::wrap(Schema__field(s, i));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
- Rf_error("Cannot call Schema__field(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
-CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema);
-RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
-BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
- return Rcpp::wrap(Schema__names(schema));
-END_RCPP
-}
-#else
-RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
- Rf_error("Cannot call Schema__names(). Please use arrow::install_arrow() to install required runtime libraries. ");
-}
-#endif
-
-// datatype.cpp
-#if defined(ARROW_R_WITH_ARROW)
std::string ListType__ToString(const std::shared_ptr<arrow::ListType>& type);
RcppExport SEXP _arrow_ListType__ToString(SEXP type_sexp){
BEGIN_RCPP
@@ -2830,6 +2754,21 @@ RcppExport SEXP _arrow_ipc___ReadSchema_InputStream(SEXP stream_sexp){
// message.cpp
#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(const std::unique_ptr<arrow::ipc::Message>& message);
+RcppExport SEXP _arrow_ipc___ReadSchema_Message(SEXP message_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::unique_ptr<arrow::ipc::Message>&>::type message(message_sexp);
+ return Rcpp::wrap(ipc___ReadSchema_Message(message));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_ipc___ReadSchema_Message(SEXP message_sexp){
+ Rf_error("Cannot call ipc___ReadSchema_Message(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// message.cpp
+#if defined(ARROW_R_WITH_ARROW)
std::unique_ptr<arrow::ipc::MessageReader> ipc___MessageReader__Open(const std::shared_ptr<arrow::io::InputStream>& stream);
RcppExport SEXP _arrow_ipc___MessageReader__Open(SEXP stream_sexp){
BEGIN_RCPP
@@ -3532,6 +3471,114 @@ RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEX
}
#endif
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> schema_(Rcpp::List fields);
+RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<Rcpp::List>::type fields(fields_sexp);
+ return Rcpp::wrap(schema_(fields));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_schema_(SEXP fields_sexp){
+ Rf_error("Cannot call schema_(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s);
+RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+ return Rcpp::wrap(Schema__ToString(s));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__ToString(SEXP s_sexp){
+ Rf_error("Cannot call Schema__ToString(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s);
+RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+ return Rcpp::wrap(Schema__num_fields(s));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__num_fields(SEXP s_sexp){
+ Rf_error("Cannot call Schema__num_fields(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s, int i);
+RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type s(s_sexp);
+ Rcpp::traits::input_parameter<int>::type i(i_sexp);
+ return Rcpp::wrap(Schema__field(s, i));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__field(SEXP s_sexp, SEXP i_sexp){
+ Rf_error("Cannot call Schema__field(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+Rcpp::CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+ return Rcpp::wrap(Schema__names(schema));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__names(SEXP schema_sexp){
+ Rf_error("Cannot call Schema__names(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+Rcpp::RawVector Schema__serialize(const std::shared_ptr<arrow::Schema>& schema);
+RcppExport SEXP _arrow_Schema__serialize(SEXP schema_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+ return Rcpp::wrap(Schema__serialize(schema));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__serialize(SEXP schema_sexp){
+ Rf_error("Cannot call Schema__serialize(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// schema.cpp
+#if defined(ARROW_R_WITH_ARROW)
+bool Schema__Equals(const std::shared_ptr<arrow::Schema>& schema, const std::shared_ptr<arrow::Schema>& other, bool check_metadata);
+RcppExport SEXP _arrow_Schema__Equals(SEXP schema_sexp, SEXP other_sexp, SEXP check_metadata_sexp){
+BEGIN_RCPP
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type schema(schema_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<arrow::Schema>&>::type other(other_sexp);
+ Rcpp::traits::input_parameter<bool>::type check_metadata(check_metadata_sexp);
+ return Rcpp::wrap(Schema__Equals(schema, other, check_metadata));
+END_RCPP
+}
+#else
+RcppExport SEXP _arrow_Schema__Equals(SEXP schema_sexp, SEXP other_sexp, SEXP check_metadata_sexp){
+ Rf_error("Cannot call Schema__Equals(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
// table.cpp
#if defined(ARROW_R_WITH_ARROW)
std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
@@ -3813,11 +3860,6 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_DataType__num_children", (DL_FUNC) &_arrow_DataType__num_children, 1},
{ "_arrow_DataType__children_pointer", (DL_FUNC) &_arrow_DataType__children_pointer, 1},
{ "_arrow_DataType__id", (DL_FUNC) &_arrow_DataType__id, 1},
- { "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1},
- { "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1},
- { "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1},
- { "_arrow_Schema__field", (DL_FUNC) &_arrow_Schema__field, 2},
- { "_arrow_Schema__names", (DL_FUNC) &_arrow_Schema__names, 1},
{ "_arrow_ListType__ToString", (DL_FUNC) &_arrow_ListType__ToString, 1},
{ "_arrow_FixedWidthType__bit_width", (DL_FUNC) &_arrow_FixedWidthType__bit_width, 1},
{ "_arrow_DateType__unit", (DL_FUNC) &_arrow_DateType__unit, 1},
@@ -3897,6 +3939,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ipc___Message__Equals", (DL_FUNC) &_arrow_ipc___Message__Equals, 2},
{ "_arrow_ipc___ReadRecordBatch__Message__Schema", (DL_FUNC) &_arrow_ipc___ReadRecordBatch__Message__Schema, 2},
{ "_arrow_ipc___ReadSchema_InputStream", (DL_FUNC) &_arrow_ipc___ReadSchema_InputStream, 1},
+ { "_arrow_ipc___ReadSchema_Message", (DL_FUNC) &_arrow_ipc___ReadSchema_Message, 1},
{ "_arrow_ipc___MessageReader__Open", (DL_FUNC) &_arrow_ipc___MessageReader__Open, 1},
{ "_arrow_ipc___MessageReader__ReadNextMessage", (DL_FUNC) &_arrow_ipc___MessageReader__ReadNextMessage, 1},
{ "_arrow_ipc___ReadMessage", (DL_FUNC) &_arrow_ipc___ReadMessage, 1},
@@ -3942,6 +3985,13 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1},
{ "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2},
{ "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2},
+ { "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1},
+ { "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1},
+ { "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1},
+ { "_arrow_Schema__field", (DL_FUNC) &_arrow_Schema__field, 2},
+ { "_arrow_Schema__names", (DL_FUNC) &_arrow_Schema__names, 1},
+ { "_arrow_Schema__serialize", (DL_FUNC) &_arrow_Schema__serialize, 1},
+ { "_arrow_Schema__Equals", (DL_FUNC) &_arrow_Schema__Equals, 3},
{ "_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1},
{ "_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1},
{ "_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1},
diff --git a/r/src/datatype.cpp b/r/src/datatype.cpp
index f4a4b09..f625c89 100644
--- a/r/src/datatype.cpp
+++ b/r/src/datatype.cpp
@@ -167,39 +167,6 @@ arrow::Type::type DataType__id(const std::shared_ptr<arrow::DataType>& type) {
}
// [[arrow::export]]
-std::shared_ptr<arrow::Schema> schema_(List fields) {
- return arrow::schema(arrow::r::List_to_shared_ptr_vector<arrow::Field>(fields));
-}
-
-// [[arrow::export]]
-std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s) {
- return s->ToString();
-}
-
-// [[arrow::export]]
-int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s) {
- return s->num_fields();
-}
-
-// [[arrow::export]]
-std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s,
- int i) {
- if (i >= s->num_fields() || i < 0) {
- Rcpp::stop("Invalid field index for schema.");
- }
-
- return s->field(i);
-}
-
-// [[arrow::export]]
-CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema) {
- auto fields = schema->fields();
- return CharacterVector(
- fields.begin(), fields.end(),
- [](const std::shared_ptr<arrow::Field>& field) { return field->name(); });
-}
-
-// [[arrow::export]]
std::string ListType__ToString(const std::shared_ptr<arrow::ListType>& type) {
return type->ToString();
}
diff --git a/r/src/message.cpp b/r/src/message.cpp
index 2769458..1726eb2 100644
--- a/r/src/message.cpp
+++ b/r/src/message.cpp
@@ -75,6 +75,15 @@ std::shared_ptr<arrow::Schema> ipc___ReadSchema_InputStream(
return schema;
}
+// [[arrow::export]]
+std::shared_ptr<arrow::Schema> ipc___ReadSchema_Message(
+ const std::unique_ptr<arrow::ipc::Message>& message) {
+ std::shared_ptr<arrow::Schema> schema;
+ arrow::ipc::DictionaryMemo empty_memo;
+ STOP_IF_NOT_OK(arrow::ipc::ReadSchema(*message, &empty_memo, &schema));
+ return schema;
+}
+
//--------- MessageReader
// [[arrow::export]]
diff --git a/r/src/schema.cpp b/r/src/schema.cpp
new file mode 100644
index 0000000..7870043
--- /dev/null
+++ b/r/src/schema.cpp
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Schema> schema_(Rcpp::List fields) {
+ return arrow::schema(arrow::r::List_to_shared_ptr_vector<arrow::Field>(fields));
+}
+
+// [[arrow::export]]
+std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s) {
+ return s->ToString();
+}
+
+// [[arrow::export]]
+int Schema__num_fields(const std::shared_ptr<arrow::Schema>& s) {
+ return s->num_fields();
+}
+
+// [[arrow::export]]
+std::shared_ptr<arrow::Field> Schema__field(const std::shared_ptr<arrow::Schema>& s,
+ int i) {
+ if (i >= s->num_fields() || i < 0) {
+ Rcpp::stop("Invalid field index for schema.");
+ }
+
+ return s->field(i);
+}
+
+// [[arrow::export]]
+Rcpp::CharacterVector Schema__names(const std::shared_ptr<arrow::Schema>& schema) {
+ auto fields = schema->fields();
+ return Rcpp::CharacterVector(
+ fields.begin(), fields.end(),
+ [](const std::shared_ptr<arrow::Field>& field) { return field->name(); });
+}
+
+// [[arrow::export]]
+Rcpp::RawVector Schema__serialize(const std::shared_ptr<arrow::Schema>& schema) {
+ arrow::ipc::DictionaryMemo empty_memo;
+ std::shared_ptr<arrow::Buffer> out;
+ STOP_IF_NOT_OK(arrow::ipc::SerializeSchema(*schema, &empty_memo,
+ arrow::default_memory_pool(), &out));
+
+ auto n = out->size();
+ Rcpp::RawVector vec(out->size());
+ std::copy_n(out->data(), n, vec.begin());
+
+ return vec;
+}
+
+// [[arrow::export]]
+bool Schema__Equals(const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<arrow::Schema>& other, bool check_metadata) {
+ return schema->Equals(*other, check_metadata);
+}
+
+#endif
diff --git a/r/tests/testthat/test-message.R b/r/tests/testthat/test-message.R
index 4cbf87d..5ddff01 100644
--- a/r/tests/testthat/test-message.R
+++ b/r/tests/testthat/test-message.R
@@ -31,3 +31,34 @@ test_that("read_message can read from input stream", {
message <- read_message(stream)
expect_null(read_message(stream))
})
+
+test_that("read_message() can read Schema messages", {
+ bytes <- schema(x=int32())$serialize()
+ stream <- BufferReader(bytes)
+ message <- read_message(stream)
+
+ expect_is(message, "arrow::ipc::Message")
+ expect_equal(message$type, MessageType$SCHEMA)
+ expect_is(message$body, "arrow::Buffer")
+ expect_is(message$metadata, "arrow::Buffer")
+
+ message <- read_message(stream)
+ expect_null(read_message(stream))
+})
+
+test_that("read_message() can handle raw vectors", {
+ batch <- record_batch(x = 1:10)
+ bytes <- batch$serialize()
+ stream <- BufferReader(bytes)
+
+ message_stream <- read_message(stream)
+ message_raw <- read_message(bytes)
+ expect_equal(message_stream, message_raw)
+
+ bytes <- schema(x=int32())$serialize()
+ stream <- BufferReader(bytes)
+ message_stream <- read_message(stream)
+ message_raw <- read_message(bytes)
+
+ expect_equal(message_stream, message_raw)
+})
diff --git a/r/tests/testthat/test-messagereader.R b/r/tests/testthat/test-messagereader.R
index 690228d..c7260fe 100644
--- a/r/tests/testthat/test-messagereader.R
+++ b/r/tests/testthat/test-messagereader.R
@@ -31,6 +31,20 @@ test_that("MessageReader can be created from raw vectors", {
message <- reader$ReadNextMessage()
expect_null(message)
+
+ schema <- schema(x = int32())
+ bytes <- schema$serialize()
+
+ reader <- MessageReader(bytes)
+
+ message <- reader$ReadNextMessage()
+ expect_is(message, "arrow::ipc::Message")
+ expect_equal(message$type, MessageType$SCHEMA)
+ expect_is(message$body, "arrow::Buffer")
+ expect_is(message$metadata, "arrow::Buffer")
+
+ message <- reader$ReadNextMessage()
+ expect_null(message)
})
test_that("MessageReader can be created from input stream", {
@@ -51,4 +65,22 @@ test_that("MessageReader can be created from input stream", {
message <- reader$ReadNextMessage()
expect_null(message)
+
+ schema <- schema(x = int32())
+ bytes <- schema$serialize()
+
+ stream <- BufferReader(bytes)
+ expect_is(stream, "arrow::io::BufferReader")
+
+ reader <- MessageReader(stream)
+ expect_is(reader, "arrow::ipc::MessageReader")
+
+ message <- reader$ReadNextMessage()
+ expect_is(message, "arrow::ipc::Message")
+ expect_equal(message$type, MessageType$SCHEMA)
+ expect_is(message$body, "arrow::Buffer")
+ expect_is(message$metadata, "arrow::Buffer")
+
+ message <- reader$ReadNextMessage()
+ expect_null(message)
})