You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by np...@apache.org on 2020/12/21 20:44:00 UTC

[arrow] branch master updated: ARROW-10642: [R] Can't get Table from RecordBatchReader with 0 batches

This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a2e7d3a  ARROW-10642: [R] Can't get Table from RecordBatchReader with 0 batches
a2e7d3a is described below

commit a2e7d3a87fb8fa1cc98a54029c0262df468838fa
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Mon Dec 21 12:43:15 2020 -0800

    ARROW-10642: [R] Can't get Table from RecordBatchReader with 0 batches
    
    Closes #8956 from nealrichardson/zero-batches
    
    Authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Neal Richardson <ne...@gmail.com>
---
 r/R/arrowExports.R                          |  8 +++---
 r/R/record-batch-reader.R                   |  5 ++--
 r/src/arrowExports.cpp                      | 26 ++++++++---------
 r/src/recordbatchreader.cpp                 | 24 +++++++---------
 r/tests/testthat/test-Table.R               |  6 ++++
 r/tests/testthat/test-record-batch-reader.R | 43 +++++++++++++++++++++++++++++
 6 files changed, 79 insertions(+), 33 deletions(-)

diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 5e51908..bea0217 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1408,12 +1408,12 @@ ipc___RecordBatchFileReader__Open <- function(file){
     .Call(`_arrow_ipc___RecordBatchFileReader__Open` , file)
 }
 
-Table__from_RecordBatchFileReader <- function(reader){
-    .Call(`_arrow_Table__from_RecordBatchFileReader` , reader)
+Table__from_RecordBatchReader <- function(reader){
+    .Call(`_arrow_Table__from_RecordBatchReader` , reader)
 }
 
-Table__from_RecordBatchStreamReader <- function(reader){
-    .Call(`_arrow_Table__from_RecordBatchStreamReader` , reader)
+Table__from_RecordBatchFileReader <- function(reader){
+    .Call(`_arrow_Table__from_RecordBatchFileReader` , reader)
 }
 
 ipc___RecordBatchFileReader__batches <- function(reader){
diff --git a/r/R/record-batch-reader.R b/r/R/record-batch-reader.R
index f80df74..119ebd6 100644
--- a/r/R/record-batch-reader.R
+++ b/r/R/record-batch-reader.R
@@ -110,7 +110,7 @@ RecordBatchReader <- R6Class("RecordBatchReader", inherit = ArrowObject,
 RecordBatchStreamReader <- R6Class("RecordBatchStreamReader", inherit = RecordBatchReader,
   public = list(
     batches = function() ipc___RecordBatchStreamReader__batches(self),
-    read_table = function() Table__from_RecordBatchStreamReader(self)
+    read_table = function() Table__from_RecordBatchReader(self)
   )
 )
 RecordBatchStreamReader$create <- function(stream) {
@@ -128,7 +128,8 @@ RecordBatchStreamReader$create <- function(stream) {
 #' @format NULL
 #' @export
 RecordBatchFileReader <- R6Class("RecordBatchFileReader", inherit = ArrowObject,
-  # Why doesn't this inherit from RecordBatchReader?
+  # Why doesn't this inherit from RecordBatchReader in C++?
+  # Origin: https://github.com/apache/arrow/pull/679
   public = list(
     get_batch = function(i) {
       ipc___RecordBatchFileReader__ReadRecordBatch(self, i)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 975ff72..f4314a9 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -5525,31 +5525,31 @@ extern "C" SEXP _arrow_ipc___RecordBatchFileReader__Open(SEXP file_sexp){
 
 // recordbatchreader.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
-extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
+std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(const std::shared_ptr<arrow::RecordBatchReader>& reader);
+extern "C" SEXP _arrow_Table__from_RecordBatchReader(SEXP reader_sexp){
 BEGIN_CPP11
-	arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchFileReader>&>::type reader(reader_sexp);
-	return cpp11::as_sexp(Table__from_RecordBatchFileReader(reader));
+	arrow::r::Input<const std::shared_ptr<arrow::RecordBatchReader>&>::type reader(reader_sexp);
+	return cpp11::as_sexp(Table__from_RecordBatchReader(reader));
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
-	Rf_error("Cannot call Table__from_RecordBatchFileReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
+extern "C" SEXP _arrow_Table__from_RecordBatchReader(SEXP reader_sexp){
+	Rf_error("Cannot call Table__from_RecordBatchReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
 }
 #endif
 
 // recordbatchreader.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader);
-extern "C" SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP reader_sexp){
+std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader);
+extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
 BEGIN_CPP11
-	arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>&>::type reader(reader_sexp);
-	return cpp11::as_sexp(Table__from_RecordBatchStreamReader(reader));
+	arrow::r::Input<const std::shared_ptr<arrow::ipc::RecordBatchFileReader>&>::type reader(reader_sexp);
+	return cpp11::as_sexp(Table__from_RecordBatchFileReader(reader));
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_Table__from_RecordBatchStreamReader(SEXP reader_sexp){
-	Rf_error("Cannot call Table__from_RecordBatchStreamReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
+extern "C" SEXP _arrow_Table__from_RecordBatchFileReader(SEXP reader_sexp){
+	Rf_error("Cannot call Table__from_RecordBatchFileReader(). Please use arrow::install_arrow() to install required runtime libraries. ");
 }
 #endif
 
@@ -6758,8 +6758,8 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_ipc___RecordBatchFileReader__num_record_batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__num_record_batches, 1}, 
 		{ "_arrow_ipc___RecordBatchFileReader__ReadRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__ReadRecordBatch, 2}, 
 		{ "_arrow_ipc___RecordBatchFileReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__Open, 1}, 
+		{ "_arrow_Table__from_RecordBatchReader", (DL_FUNC) &_arrow_Table__from_RecordBatchReader, 1}, 
 		{ "_arrow_Table__from_RecordBatchFileReader", (DL_FUNC) &_arrow_Table__from_RecordBatchFileReader, 1}, 
-		{ "_arrow_Table__from_RecordBatchStreamReader", (DL_FUNC) &_arrow_Table__from_RecordBatchStreamReader, 1}, 
 		{ "_arrow_ipc___RecordBatchFileReader__batches", (DL_FUNC) &_arrow_ipc___RecordBatchFileReader__batches, 1}, 
 		{ "_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 2}, 
 		{ "_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteTable, 2}, 
diff --git a/r/src/recordbatchreader.cpp b/r/src/recordbatchreader.cpp
index 7648716..6f746e3 100644
--- a/r/src/recordbatchreader.cpp
+++ b/r/src/recordbatchreader.cpp
@@ -89,8 +89,18 @@ std::shared_ptr<arrow::ipc::RecordBatchFileReader> ipc___RecordBatchFileReader__
 }
 
 // [[arrow::export]]
+std::shared_ptr<arrow::Table> Table__from_RecordBatchReader(
+    const std::shared_ptr<arrow::RecordBatchReader>& reader) {
+  std::shared_ptr<arrow::Table> table = nullptr;
+  StopIfNotOk(reader->ReadAll(&table));
+  return table;
+}
+
+// [[arrow::export]]
 std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
     const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
+  // RecordBatchStreamReader inherits from RecordBatchReader
+  // but RecordBatchFileReader apparently does not
   int num_batches = reader->num_record_batches();
   std::vector<std::shared_ptr<arrow::RecordBatch>> batches(num_batches);
   for (int i = 0; i < num_batches; i++) {
@@ -101,20 +111,6 @@ std::shared_ptr<arrow::Table> Table__from_RecordBatchFileReader(
 }
 
 // [[arrow::export]]
-std::shared_ptr<arrow::Table> Table__from_RecordBatchStreamReader(
-    const std::shared_ptr<arrow::ipc::RecordBatchStreamReader>& reader) {
-  std::shared_ptr<arrow::RecordBatch> batch;
-  std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
-  while (true) {
-    StopIfNotOk(reader->ReadNext(&batch));
-    if (!batch) break;
-    batches.push_back(batch);
-  }
-
-  return ValueOrStop(arrow::Table::FromRecordBatches(std::move(batches)));
-}
-
-// [[arrow::export]]
 cpp11::list ipc___RecordBatchFileReader__batches(
     const std::shared_ptr<arrow::ipc::RecordBatchFileReader>& reader) {
   auto n = reader->num_record_batches();
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 2dbbab6..3cf8de7 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -353,6 +353,12 @@ test_that("table() auto splices (ARROW-5718)", {
   expect_equivalent(as.data.frame(tab3), df)
 })
 
+test_that("Validation when creating table with schema (ARROW-10953)", {
+  tab <- Table$create(data.frame(), schema = schema(a = int32()))
+  skip("This segfaults")
+  expect_identical(dim(as.data.frame(tab)), c(0L, 1L))
+})
+
 test_that("==.Table", {
   tab1 <- Table$create(x = 1:2, y = c("a", "b"))
   tab2 <- Table$create(x = 1:2, y = c("a", "b"))
diff --git a/r/tests/testthat/test-record-batch-reader.R b/r/tests/testthat/test-record-batch-reader.R
index 533d53e..9a5e4dd 100644
--- a/r/tests/testthat/test-record-batch-reader.R
+++ b/r/tests/testthat/test-record-batch-reader.R
@@ -75,6 +75,36 @@ test_that("RecordBatchFileReader / Writer", {
   expect_equal(reader$num_record_batches, 3)
 })
 
+test_that("StreamReader read_table", {
+  sink <- BufferOutputStream$create()
+  writer <- RecordBatchStreamWriter$create(sink, batch$schema)
+  expect_is(writer, "RecordBatchWriter")
+  writer$write(batch)
+  writer$write(tab)
+  writer$write(tbl)
+  writer$close()
+  buf <- sink$finish()
+
+  reader <- RecordBatchStreamReader$create(buf)
+  out <- reader$read_table()
+  expect_identical(dim(out), c(30L, 2L))
+})
+
+test_that("FileReader read_table", {
+  sink <- BufferOutputStream$create()
+  writer <- RecordBatchFileWriter$create(sink, batch$schema)
+  expect_is(writer, "RecordBatchWriter")
+  writer$write(batch)
+  writer$write(tab)
+  writer$write(tbl)
+  writer$close()
+  buf <- sink$finish()
+
+  reader <- RecordBatchFileReader$create(buf)
+  out <- reader$read_table()
+  expect_identical(dim(out), c(30L, 2L))
+})
+
 test_that("MetadataFormat", {
   expect_identical(get_ipc_metadata_version(5), 4L)
   expect_identical(get_ipc_metadata_version("V4"), 3L)
@@ -97,3 +127,16 @@ test_that("MetadataFormat", {
     '"45" is not a valid IPC MetadataVersion'
   )
 })
+
+test_that("reader with 0 batches", {
+  # IPC stream containing only a schema (ARROW-10642)
+  sink <- BufferOutputStream$create()
+  writer <- RecordBatchStreamWriter$create(sink, schema(a = int32()))
+  writer$close()
+  buf <- sink$finish()
+
+  reader <- RecordBatchStreamReader$create(buf)
+  tab <- reader$read_table()
+  expect_is(tab, "Table")
+  expect_identical(dim(tab), c(0L, 1L))
+})