You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/06 20:17:48 UTC

[arrow] branch master updated: ARROW-3941: [R] RecordBatchStreamReader$schema

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a084006  ARROW-3941: [R] RecordBatchStreamReader$schema
a084006 is described below

commit a084006b54fba81a2da6aba8b328e1ce2dbd8bce
Author: Romain Francois <ro...@purrple.cat>
AuthorDate: Thu Dec 6 14:17:41 2018 -0600

    ARROW-3941: [R] RecordBatchStreamReader$schema
    
    https://issues.apache.org/jira/browse/ARROW-3941
    
    follow up to #3043 to fix api of classes RecordBatchFileReader, RecordBatchStreamReader, RecordBatchFileWriter, RecordBatchStreamWriter
    
    Author: Romain Francois <ro...@purrple.cat>
    
    Closes #3104 from romainfrancois/ARROW-3941/RecordBatchStreamReader and squashes the following commits:
    
    0d2494fc9 <Romain Francois> s/get/read/g
    01bd167db <Romain Francois> fix RecordBatch(Stream|File)(Reader|Writer) api
---
 r/NAMESPACE                               |  3 ++
 r/R/RecordBatchReader.R                   | 29 ++++++++++---
 r/tests/testthat/test-recordbatchreader.R | 68 +++++++++++++++++++++++++++++++
 3 files changed, 95 insertions(+), 5 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index 10677b4..cc5961e 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -12,12 +12,15 @@ S3method(FixedSizeBufferWriter,"arrow::Buffer")
 S3method(FixedSizeBufferWriter,default)
 S3method(MessageReader,"arrow::io::InputStream")
 S3method(MessageReader,default)
+S3method(RecordBatchFileReader,"arrow::Buffer")
 S3method(RecordBatchFileReader,"arrow::io::RandomAccessFile")
 S3method(RecordBatchFileReader,character)
 S3method(RecordBatchFileReader,fs_path)
+S3method(RecordBatchFileReader,raw)
 S3method(RecordBatchFileWriter,"arrow::io::OutputStream")
 S3method(RecordBatchFileWriter,character)
 S3method(RecordBatchFileWriter,fs_path)
+S3method(RecordBatchStreamReader,"arrow::Buffer")
 S3method(RecordBatchStreamReader,"arrow::io::InputStream")
 S3method(RecordBatchStreamReader,raw)
 S3method(RecordBatchStreamWriter,"arrow::io::OutputStream")
diff --git a/r/R/RecordBatchReader.R b/r/R/RecordBatchReader.R
index 222f055..6dab2d1 100644
--- a/r/R/RecordBatchReader.R
+++ b/r/R/RecordBatchReader.R
@@ -31,10 +31,12 @@
 #' @name arrow__RecordBatchReader
 `arrow::RecordBatchReader` <- R6Class("arrow::RecordBatchReader", inherit = `arrow::Object`,
   public = list(
-    schema = function() shared_ptr(`arrow::Schema`, RecordBatchReader__schema(self)),
-    ReadNext = function() {
+    read_next_batch = function() {
       shared_ptr(`arrow::RecordBatch`, RecordBatchReader__ReadNext(self))
     }
+  ),
+  active = list(
+    schema = function() shared_ptr(`arrow::Schema`, RecordBatchReader__schema(self))
   )
 )
 
@@ -70,11 +72,13 @@
 #' @name arrow__ipc__RecordBatchFileReader
 `arrow::ipc::RecordBatchFileReader` <- R6Class("arrow::ipc::RecordBatchFileReader", inherit = `arrow::Object`,
   public = list(
-    schema = function() shared_ptr(`arrow::Schema`, ipc___RecordBatchFileReader__schema(self)),
-    num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self),
-    ReadRecordBatch = function(i) shared_ptr(`arrow::RecordBatch`, ipc___RecordBatchFileReader__ReadRecordBatch(self, i)),
+    get_batch = function(i) shared_ptr(`arrow::RecordBatch`, ipc___RecordBatchFileReader__ReadRecordBatch(self, i)),
 
     batches = function() map(ipc___RecordBatchFileReader__batches(self), shared_ptr, class = `arrow::RecordBatch`)
+  ),
+  active = list(
+    num_record_batches = function() ipc___RecordBatchFileReader__num_record_batches(self),
+    schema = function() shared_ptr(`arrow::Schema`, ipc___RecordBatchFileReader__schema(self))
   )
 )
 
@@ -97,6 +101,11 @@ RecordBatchStreamReader <- function(stream){
   RecordBatchStreamReader(BufferReader(stream))
 }
 
+#' @export
+`RecordBatchStreamReader.arrow::Buffer` <- function(stream) {
+  RecordBatchStreamReader(BufferReader(stream))
+}
+
 
 #' Create an [arrow::ipc::RecordBatchFileReader][arrow__ipc__RecordBatchFileReader] from a file
 #'
@@ -122,3 +131,13 @@ RecordBatchFileReader <- function(file) {
 `RecordBatchFileReader.fs_path` <- function(file) {
   RecordBatchFileReader(ReadableFile(file))
 }
+
+#' @export
+`RecordBatchFileReader.arrow::Buffer` <- function(file) {
+  RecordBatchFileReader(BufferReader(file))
+}
+
+#' @export
+`RecordBatchFileReader.raw` <- function(file) {
+  RecordBatchFileReader(BufferReader(file))
+}
diff --git a/r/tests/testthat/test-recordbatchreader.R b/r/tests/testthat/test-recordbatchreader.R
new file mode 100644
index 0000000..d2b6a09
--- /dev/null
+++ b/r/tests/testthat/test-recordbatchreader.R
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+context("arrow::RecordBatch.*(Reader|Writer)")
+
+test_that("RecordBatchStreamReader / Writer", {
+  batch <- record_batch(tibble::tibble(
+    x = 1:10,
+    y = letters[1:10]
+  ))
+
+  sink <- BufferOutputStream()
+  writer <- RecordBatchStreamWriter(sink, batch$schema)
+  expect_is(writer, "arrow::ipc::RecordBatchStreamWriter")
+  writer$write_batch(batch)
+  writer$close()
+
+  buf <- sink$getvalue()
+  expect_is(buf, "arrow::Buffer")
+
+  reader <- RecordBatchStreamReader(buf)
+  expect_is(reader, "arrow::ipc::RecordBatchStreamReader")
+
+  batch1 <- reader$read_next_batch()
+  expect_is(batch1, "arrow::RecordBatch")
+  expect_equal(batch, batch1)
+
+  expect_null(reader$read_next_batch())
+})
+
+test_that("RecordBatchFileReader / Writer", {
+  batch <- record_batch(tibble::tibble(
+    x = 1:10,
+    y = letters[1:10]
+  ))
+
+  sink <- BufferOutputStream()
+  writer <- RecordBatchFileWriter(sink, batch$schema)
+  expect_is(writer, "arrow::ipc::RecordBatchFileWriter")
+  writer$write_batch(batch)
+  writer$close()
+
+  buf <- sink$getvalue()
+  expect_is(buf, "arrow::Buffer")
+
+  reader <- RecordBatchFileReader(buf)
+  expect_is(reader, "arrow::ipc::RecordBatchFileReader")
+
+  batch1 <- reader$get_batch(0L)
+  expect_is(batch1, "arrow::RecordBatch")
+  expect_equal(batch, batch1)
+
+  expect_equal(reader$num_record_batches, 1L)
+})