You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by np...@apache.org on 2020/12/21 20:44:00 UTC
[arrow] branch master updated: ARROW-10642: [R] Can't get Table
from RecordBatchReader with 0 batches
This is an automated email from the ASF dual-hosted git repository.
npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a2e7d3a ARROW-10642: [R] Can't get Table from RecordBatchReader with 0 batches
a2e7d3a is described below
commit a2e7d3a87fb8fa1cc98a54029c0262df468838fa
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Mon Dec 21 12:43:15 2020 -0800
ARROW-10642: [R] Can't get Table from RecordBatchReader with 0 batches
Closes #8956 from nealrichardson/zero-batches
Authored-by: Neal Richardson <ne...@gmail.com>
Signed-off-by: Neal Richardson <ne...@gmail.com>
---
r/R/arrowExports.R | 8 +++---
r/R/record-batch-reader.R | 5 ++--
r/src/arrowExports.cpp | 26 ++++++++---------
r/src/recordbatchreader.cpp | 24 +++++++---------
r/tests/testthat/test-Table.R | 6 ++++
r/tests/testthat/test-record-batch-reader.R | 43 +++++++++++++++++++++++++++++
6 files changed, 79 insertions(+), 33 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 5e51908..bea0217 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1408,12 +1408,12 @@ ipc___RecordBatchFileReader__Open <- function(file){
.Call(`_arrow_ipc___RecordBatchFileReader__Open` , file)
}
-Table__from_RecordBatchFileReader <- function(reader){
- .Call(`_arrow_Table__from_RecordBatchFileReader` , reader)
+Table__from_RecordBatchReader <- function(reader){
+ .Call(`_arrow_Table__from_RecordBatchReader` , reader)
}
-Table__from_RecordBatchStreamReader <- function(reader){
- .Call(`_arrow_Table__from_RecordBatchStreamReader` , reader)
+Table__from_RecordBatchFileReader <- function(reader){
+ .Call(`_arrow_Table__from_RecordBatchFileReader` , reader)
}
ipc___RecordBatchFileReader__batches <- function(reader){
diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R
index f80df74..119ebd6 100644
--- a/r/R/record-batch-reader.R
+++ b/r/R/record-batch-reader.R
@@ -110,7 +110,7 @@ RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject,
RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader,
public = list(
batches = function() ipc___RecordBatchStreamReader__batches(self),
- read_table = function() Table__from_RecordBatchStreamReader(self)
+ read_table = function() Table__from_RecordBatchReader(self)
)
)
RecordBatchStreamReader$create <- function(stream) {
@@ -128,7 +128,8 @@ RecordBatchStreamReader$create <- function(stream) {
#' @format NULL
#' @export
RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject,
- # Why doesn't this inherit from RecordBatchReader?
+ # Why doesn't this inherit from RecordBatchReader in C++?
+ # Origin: https://github.com/apache/arrow/pull/679
public = list(
get_batch = function(i) {
ipc___RecordBatchFileReader__ReadRecordBatch(self, i)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 975ff72..f4314a9 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -5525,31 +5525,31 @@ extern "C" SEXP _arrow_ipc___RecordBatchFileReader__Open(SEXP file_sexp){
// recordbatchreader.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
-extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
+std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_Table__from_RecordBatchReader(SEXP reader_sexp){
BEGIN_CPP11
- arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchFileReader>&>::type reader(reader_sexp);
- return cpp11::as_sexp(Table__from_RecordBatchFileReader(reader));
+ arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+ return cpp11::as_sexp(Table__from_RecordBatchReader(reader));
END_CPP11
}
#else
-extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
- Rf_error("Cannot call Table__from_RecordBatchFileReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
+extern "C" SEXP _arrow_Table__from_RecordBatchReader(SEXP reader_sexp){
+ Rf_error("Cannot call Table__from_RecordBatchReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
}
#endif
// recordbatchreader.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader);
-extern "C" SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP reader_sexp){
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
BEGIN_CPP11
- arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>&>::type reader(reader_sexp);
- return cpp11::as_sexp(Table__from_RecordBatchStreamReader(reader));
+ arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchFileReader>&>::type reader(reader_sexp);
+ return cpp11::as_sexp(Table__from_RecordBatchFileReader(reader));
END_CPP11
}
#else
-extern "C" SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP reader_sexp){
- Rf_error("Cannot call Table__from_RecordBatchStreamReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
+extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
+ Rf_error("Cannot call Table__from_RecordBatchFileReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
}
#endif
@@ -6758,8 +6758,8 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1},
{ "_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2},
{ "_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__Open, 1},
+ { "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1},
{ "_arrow_Table__from_RecordBatchFileReader", (DL_FUNC) &_arrow_Table__from_RecordBatchFileReader, 1},
- { "_arrow_Table__from_RecordBatchStreamReader", (DL_FUNC) &_arrow_Table__from_RecordBatchStreamReader, 1},
{ "_arrow_ipc___RecordBatchFileReader__batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__batches, 1},
{ "_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 2},
{ "_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteTable, 2},
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 7648716..6f746e3 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -89,8 +89,18 @@ std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc___RecordBatchFileReader__
}
// [[arrow::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(
+ const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+ std::shared_ptr<arrow::Table> table = nullptr;
+ StopIfNotOk(reader->ReadAll(&table));
+ return table;
+}
+
+// [[arrow::export]]
std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+ // RecordBatchStreamReader inherits from RecordBatchReader
+ // but RecordBatchFileReader apparently does not
int num_batches = reader->num_record_batches();
std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
for (int i = 0; i < num_batches; i++) {
@@ -101,20 +111,6 @@ std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
}
// [[arrow::export]]
-std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(
- const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader) {
- std::shared_ptr<arrow::RecordBatch> batch;
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- while (true) {
- StopIfNotOk(reader->ReadNext(&batch));
- if (!batch) break;
- batches.push_back(batch);
- }
-
- return ValueOrStop(arrow::Table::FromRecordBatches(std::move(batches)));
-}
-
-// [[arrow::export]]
cpp11::list ipc___RecordBatchFileReader__batches(
const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
auto n = reader->num_record_batches();
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 2dbbab6..3cf8de7 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -353,6 +353,12 @@ test_that("table() auto splices (ARROW-5718)", {
expect_equivalent(as.data.frame(tab3), df)
})
+test_that("Validation when creating table with schema (ARROW-10953)", {
+ tab <- Table$create(data.frame(), schema = schema(a = int32()))
+ skip("This segfaults")
+ expect_identical(dim(as.data.frame(tab)), c(0L, 1L))
+})
+
test_that("==.Table", {
tab1 <- Table$create(x = 1:2, y = c("a", "b"))
tab2 <- Table$create(x = 1:2, y = c("a", "b"))
diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R
index 533d53e..9a5e4dd 100644
--- a/r/tests/testthat/test-record-batch-reader.R
+++ b/r/tests/testthat/test-record-batch-reader.R
@@ -75,6 +75,36 @@ test_that("RecordBatchFileReader / Writer", {
expect_equal(reader$num_record_batches, 3)
})
+test_that("StreamReader read_table", {
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchStreamWriter$create(sink, batch$schema)
+ expect_is(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchStreamReader$create(buf)
+ out <- reader$read_table()
+ expect_identical(dim(out), c(30L, 2L))
+})
+
+test_that("FileReader read_table", {
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchFileWriter$create(sink, batch$schema)
+ expect_is(writer, "RecordBatchWriter")
+ writer$write(batch)
+ writer$write(tab)
+ writer$write(tbl)
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchFileReader$create(buf)
+ out <- reader$read_table()
+ expect_identical(dim(out), c(30L, 2L))
+})
+
test_that("MetadataFormat", {
expect_identical(get_ipc_metadata_version(5), 4L)
expect_identical(get_ipc_metadata_version("V4"), 3L)
@@ -97,3 +127,16 @@ test_that("MetadataFormat", {
'"45" is not a valid IPC MetadataVersion'
)
})
+
+test_that("reader with 0 batches", {
+ # IPC stream containing only a schema (ARROW-10642)
+ sink <- BufferOutputStream$create()
+ writer <- RecordBatchStreamWriter$create(sink, schema(a = int32()))
+ writer$close()
+ buf <- sink$finish()
+
+ reader <- RecordBatchStreamReader$create(buf)
+ tab <- reader$read_table()
+ expect_is(tab, "Table")
+ expect_identical(dim(tab), c(0L, 1L))
+})