You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by np...@apache.org on 2020/12/30 16:29:33 UTC

[arrow] branch master updated: ARROW-10416: [R] Support Tables in Flight

This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 635f12b  ARROW-10416: [R] Support Tables in Flight
635f12b is described below

commit 635f12b3855994fff2de168f1ca14135769118f1
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Wed Dec 30 08:28:41 2020 -0800

    ARROW-10416: [R] Support Tables in Flight
    
    In addition to the feature described in the title, this PR
    
    * renames `push_data()` to `flight_put()` and adds an `overwrite` argument to optionally check for the existence of a flight with the the same name
    * adds `list_flights()` and `flight_path_exists()` to see available resources
    * adds tests for the flight bindings, run if the demo flight server is found to be running (managing that process and/or running this in CI is deferred)
    * adds `r_to_py` and `py_to_r` methods for Schema objects; this fixes an issue where schema metadata was lost when converting Python/R due to the indirect way that Tables are passed
    
    Closes #9039 from nealrichardson/flight-tables
    
    Authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Neal Richardson <ne...@gmail.com>
---
 r/NAMESPACE                           |  4 ++-
 r/R/arrow-package.R                   | 16 ++++-----
 r/R/arrowExports.R                    |  4 +++
 r/R/flight.R                          | 68 +++++++++++++++++++++++++++--------
 r/R/python.R                          | 25 +++++++++++--
 r/_pkgdown.yml                        |  3 +-
 r/man/flight_get.Rd                   |  4 +--
 r/man/{push_data.Rd => flight_put.Rd} | 12 ++++---
 r/man/list_flights.Rd                 | 23 ++++++++++++
 r/src/arrowExports.cpp                | 16 +++++++++
 r/src/py-to-r.cpp                     |  6 ++++
 r/tests/testthat/test-python-flight.R | 63 ++++++++++++++++++++++++++++++++
 r/tests/testthat/test-python.R        | 31 ++++++++++++----
 r/vignettes/flight.Rmd                |  2 +-
 14 files changed, 235 insertions(+), 42 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index 316679a..4c50dd9 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -217,6 +217,8 @@ export(fixed_size_binary)
 export(fixed_size_list_of)
 export(flight_connect)
 export(flight_get)
+export(flight_path_exists)
+export(flight_put)
 export(float)
 export(float16)
 export(float32)
@@ -233,6 +235,7 @@ export(large_binary)
 export(large_list_of)
 export(large_utf8)
 export(last_col)
+export(list_flights)
 export(list_of)
 export(load_flight_server)
 export(map_batches)
@@ -244,7 +247,6 @@ export(null)
 export(num_range)
 export(one_of)
 export(open_dataset)
-export(push_data)
 export(read_arrow)
 export(read_csv_arrow)
 export(read_delim_arrow)
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index 5d1dbbe..8ec3863 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -38,16 +38,13 @@
       s3_register(m, cl)
     }
   }
-
   s3_register("dplyr::tbl_vars", "arrow_dplyr_query")
-  s3_register("reticulate::py_to_r", "pyarrow.lib.Array")
-  s3_register("reticulate::py_to_r", "pyarrow.lib.RecordBatch")
-  s3_register("reticulate::py_to_r", "pyarrow.lib.ChunkedArray")
-  s3_register("reticulate::py_to_r", "pyarrow.lib.Table")
-  s3_register("reticulate::r_to_py", "Array")
-  s3_register("reticulate::r_to_py", "RecordBatch")
-  s3_register("reticulate::r_to_py", "ChunkedArray")
-  s3_register("reticulate::r_to_py", "Table")
+
+  for (cl in c("Array", "RecordBatch", "ChunkedArray", "Table", "Schema")) {
+    s3_register("reticulate::py_to_r", paste0("pyarrow.lib.", cl))
+    s3_register("reticulate::r_to_py", cl)
+  }
+
   invisible()
 }
 
@@ -128,4 +125,3 @@ ArrowObject <- R6Class("ArrowObject",
 all.equal.ArrowObject <- function(target, current, ..., check.attributes = TRUE) {
   target$Equals(current, check_metadata = check.attributes)
 }
-
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index bea0217..65c4dcd 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1272,6 +1272,10 @@ ImportRecordBatch <- function(array, schema){
     .Call(`_arrow_ImportRecordBatch` , array, schema)
 }
 
+ImportSchema <- function(schema){
+    .Call(`_arrow_ImportSchema` , schema)
+}
+
 allocate_arrow_schema <- function(){
     .Call(`_arrow_allocate_arrow_schema` )
 }
diff --git a/r/R/flight.R b/r/R/flight.R
index 7721661..486c59a 100644
--- a/r/R/flight.R
+++ b/r/R/flight.R
@@ -41,18 +41,27 @@ flight_connect <- function(host = "localhost", port, scheme = "grpc+tcp") {
 #' Send data to a Flight server
 #'
 #' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
-#' @param data `data.frame` or [RecordBatch] to upload
+#' @param data `data.frame`, [RecordBatch], or [Table] to upload
 #' @param path string identifier to store the data under
+#' @param overwrite logical: if `path` exists on `client` already, should we
+#' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and
+#' `path` exists, the function will error.
 #' @return `client`, invisibly.
 #' @export
-push_data <- function(client, data, path) {
-  if (inherits(data, "data.frame")) {
-    data <- record_batch(data)
+flight_put <- function(client, data, path, overwrite = TRUE) {
+  if (!overwrite && flight_path_exists(client, path)) {
+    stop(path, " exists.", call. = FALSE)
+  }
+  if (is.data.frame(data)) {
+    data <- Table$create(data)
   }
-  # TODO: this is only RecordBatch; handle Table
   py_data <- reticulate::r_to_py(data)
   writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]]
-  writer$write_batch(py_data)
+  if (inherits(data, "RecordBatch")) {
+    writer$write_batch(py_data)
+  } else {
+    writer$write_table(py_data)
+  }
   writer$close()
   invisible(client)
 }
@@ -60,22 +69,53 @@ push_data <- function(client, data, path) {
 #' Get data from a Flight server
 #'
 #' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
-#' @param path string identifier under which the data is stored
-#' @return A [RecordBatch]
+#' @param path string identifier under which data is stored
+#' @return A [Table]
 #' @export
 flight_get <- function(client, path) {
+  reader <- flight_reader(client, path)
+  reader$read_all()
+}
+
+# TODO: could use this as a RecordBatch iterator, call $read_chunk() on this
+flight_reader <- function(client, path) {
   info <- client$get_flight_info(descriptor_for_path(path))
   # Hack: assume a single ticket, on the same server as client is already connected
   ticket <- info$endpoints[[1]]$ticket
-  reader <- client$do_get(ticket)
-  # Next hack: assume a single record batch
-  # TODO: read_all() instead? Or read all chunks and build Table in R?
-  chunk <- reader$read_chunk()
-  # Drop $app_metadata and just return the data
-  chunk$data
+  client$do_get(ticket)
 }
 
 descriptor_for_path <- function(path) {
   pa <- reticulate::import("pyarrow")
   pa$flight$FlightDescriptor$for_path(path)
 }
+
+#' See available resources on a Flight server
+#'
+#' @inheritParams flight_get
+#' @return `list_flights()` returns a character vector of paths.
+#' `flight_path_exists()` returns a logical value, the equivalent of `path %in% list_flights()`
+#' @export
+list_flights <- function(client) {
+  generator <- client$list_flights()
+  out <- reticulate::iterate(generator, function(x) as.character(x$descriptor$path[[1]]))
+  out
+}
+
+#' @rdname list_flights
+#' @export
+flight_path_exists <- function(client, path) {
+  it_exists <- tryCatch({
+      client$get_flight_info(descriptor_for_path(path))
+      TRUE
+    },
+    error = function(e) {
+      msg <- conditionMessage(e)
+      if (!any(grepl("ArrowKeyError", msg))) {
+        # Raise an error if this fails for any reason other than not found
+        stop(e)
+      }
+      FALSE
+    }
+  )
+}
diff --git a/r/R/python.R b/r/R/python.R
index 739f048..b200d93 100644
--- a/r/R/python.R
+++ b/r/R/python.R
@@ -90,7 +90,7 @@ py_to_r.pyarrow.lib.ChunkedArray <- function(x, ...) {
 r_to_py.Table <- function(x, convert = FALSE) {
   # Import with convert = FALSE so that `_import_from_c` returns a Python object
   pa <- reticulate::import("pyarrow", convert = FALSE)
-  out <- pa$Table$from_arrays(x$columns, names = names(x))
+  out <- pa$Table$from_arrays(x$columns, schema = x$schema)
   # But set the convert attribute on the return object to the requested value
   assign("convert", convert, out)
   out
@@ -100,7 +100,28 @@ py_to_r.pyarrow.lib.Table <- function(x, ...) {
   colnames <- maybe_py_to_r(x$column_names)
   r_cols <- maybe_py_to_r(x$columns)
   names(r_cols) <- colnames
-  Table$create(!!!r_cols)
+  Table$create(!!!r_cols, schema = maybe_py_to_r(x$schema))
+}
+
+py_to_r.pyarrow.lib.Schema <- function(x, ...) {
+  schema_ptr <- allocate_arrow_schema()
+  on.exit(delete_arrow_schema(schema_ptr))
+
+  x$`_export_to_c`(schema_ptr)
+  ImportSchema(schema_ptr)
+}
+
+r_to_py.Schema <- function(x, convert = FALSE) {
+  schema_ptr <- allocate_arrow_schema()
+  on.exit(delete_arrow_schema(schema_ptr))
+
+  # Import with convert = FALSE so that `_import_from_c` returns a Python object
+  pa <- reticulate::import("pyarrow", convert = FALSE)
+  ExportSchema(x, schema_ptr)
+  out <- pa$Schema$`_import_from_c`(schema_ptr)
+  # But set the convert attribute on the return object to the requested value
+  assign("convert", convert, out)
+  out
 }
 
 maybe_py_to_r <- function(x) {
diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml
index d946f31..b3eb895 100644
--- a/r/_pkgdown.yml
+++ b/r/_pkgdown.yml
@@ -122,8 +122,9 @@ reference:
   contents:
   - load_flight_server
   - flight_connect
-  - push_data
   - flight_get
+  - flight_put
+  - list_flights
 - title: File systems
   contents:
   - s3_bucket
diff --git a/r/man/flight_get.Rd b/r/man/flight_get.Rd
index f427b39..a79c4d7 100644
--- a/r/man/flight_get.Rd
+++ b/r/man/flight_get.Rd
@@ -9,10 +9,10 @@ flight_get(client, path)
 \arguments{
 \item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
 
-\item{path}{string identifier under which the data is stored}
+\item{path}{string identifier under which data is stored}
 }
 \value{
-A \link{RecordBatch}
+A \link{Table}
 }
 \description{
 Get data from a Flight server
diff --git a/r/man/push_data.Rd b/r/man/flight_put.Rd
similarity index 51%
rename from r/man/push_data.Rd
rename to r/man/flight_put.Rd
index 8fb65c7..13a8da1 100644
--- a/r/man/push_data.Rd
+++ b/r/man/flight_put.Rd
@@ -1,17 +1,21 @@
 % Generated by roxygen2: do not edit by hand
 % Please edit documentation in R/flight.R
-\name{push_data}
-\alias{push_data}
+\name{flight_put}
+\alias{flight_put}
 \title{Send data to a Flight server}
 \usage{
-push_data(client, data, path)
+flight_put(client, data, path, overwrite = TRUE)
 }
 \arguments{
 \item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
 
-\item{data}{\code{data.frame} or \link{RecordBatch} to upload}
+\item{data}{\code{data.frame}, \link{RecordBatch}, or \link{Table} to upload}
 
 \item{path}{string identifier to store the data under}
+
+\item{overwrite}{logical: if \code{path} exists on \code{client} already, should we
+replace it with the contents of \code{data}? Default is \code{TRUE}; if \code{FALSE} and
+\code{path} exists, the function will error.}
 }
 \value{
 \code{client}, invisibly.
diff --git a/r/man/list_flights.Rd b/r/man/list_flights.Rd
new file mode 100644
index 0000000..d8ebb0d
--- /dev/null
+++ b/r/man/list_flights.Rd
@@ -0,0 +1,23 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/flight.R
+\name{list_flights}
+\alias{list_flights}
+\alias{flight_path_exists}
+\title{See available resources on a Flight server}
+\usage{
+list_flights(client)
+
+flight_path_exists(client, path)
+}
+\arguments{
+\item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
+
+\item{path}{string identifier under which data is stored}
+}
+\value{
+\code{list_flights()} returns a character vector of paths.
+\code{flight_path_exists()} returns a logical value, the equivalent of \code{path \%in\% list_flights()}
+}
+\description{
+See available resources on a Flight server
+}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index f4314a9..093949d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4990,6 +4990,21 @@ extern "C" SEXP _arrow_ImportRecordBatch(SEXP array_sexp, SEXP schema_sexp){
 
 // py-to-r.cpp
 #if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> ImportSchema(arrow::r::Pointer<struct ArrowSchema> schema);
+extern "C" SEXP _arrow_ImportSchema(SEXP schema_sexp){
+BEGIN_CPP11
+	arrow::r::Input<arrow::r::Pointer<struct ArrowSchema>>::type schema(schema_sexp);
+	return cpp11::as_sexp(ImportSchema(schema));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ImportSchema(SEXP schema_sexp){
+	Rf_error("Cannot call ImportSchema(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// py-to-r.cpp
+#if defined(ARROW_R_WITH_ARROW)
 arrow::r::Pointer<struct ArrowSchema> allocate_arrow_schema();
 extern "C" SEXP _arrow_allocate_arrow_schema(){
 BEGIN_CPP11
@@ -6724,6 +6739,7 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_parquet___arrow___FileReader__GetSchema", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema, 1}, 
 		{ "_arrow_ImportArray", (DL_FUNC) &_arrow_ImportArray, 2}, 
 		{ "_arrow_ImportRecordBatch", (DL_FUNC) &_arrow_ImportRecordBatch, 2}, 
+		{ "_arrow_ImportSchema", (DL_FUNC) &_arrow_ImportSchema, 1}, 
 		{ "_arrow_allocate_arrow_schema", (DL_FUNC) &_arrow_allocate_arrow_schema, 0}, 
 		{ "_arrow_delete_arrow_schema", (DL_FUNC) &_arrow_delete_arrow_schema, 1}, 
 		{ "_arrow_allocate_arrow_array", (DL_FUNC) &_arrow_allocate_arrow_array, 0}, 
diff --git a/r/src/py-to-r.cpp b/r/src/py-to-r.cpp
index 5677791..a571cfa 100644
--- a/r/src/py-to-r.cpp
+++ b/r/src/py-to-r.cpp
@@ -35,6 +35,12 @@ std::shared_ptr<arrow::RecordBatch> ImportRecordBatch(
 }
 
 // [[arrow::export]]
+std::shared_ptr<arrow::Schema> ImportSchema(
+    arrow::r::Pointer<struct ArrowSchema> schema) {
+  return ValueOrStop(arrow::ImportSchema(schema));
+}
+
+// [[arrow::export]]
 arrow::r::Pointer<struct ArrowSchema> allocate_arrow_schema() { return {}; }
 
 // [[arrow::export]]
diff --git a/r/tests/testthat/test-python-flight.R b/r/tests/testthat/test-python-flight.R
new file mode 100644
index 0000000..dbd2ba9
--- /dev/null
+++ b/r/tests/testthat/test-python-flight.R
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Assumes:
+# * We've already done arrow::install_pyarrow()
+# * R -e 'arrow::load_flight_server("demo_flight_server")$DemoFlightServer(port = 8089)$serve()'
+# TODO: set up CI job to test this, or some way of running a background process
+if (process_is_running("demo_flight_server")) {
+  client <- flight_connect(port = 8089)
+  flight_obj <- tempfile()
+
+  test_that("flight_path_exists", {
+    expect_false(flight_path_exists(client, flight_obj))
+    expect_false(flight_obj %in% list_flights(client))
+  })
+
+  test_that("flight_put", {
+    flight_put(client, example_data, path = flight_obj)
+    expect_true(flight_path_exists(client, flight_obj))
+    expect_true(flight_obj %in% list_flights(client))
+  })
+
+  test_that("flight_get", {
+    expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)
+  })
+
+  test_that("flight_put with RecordBatch", {
+    flight_obj2 <- tempfile()
+    flight_put(client, RecordBatch$create(example_data), path = flight_obj2)
+    expect_identical(as.data.frame(flight_get(client, flight_obj2)), example_data)
+  })
+
+  test_that("flight_put with overwrite = FALSE", {
+    expect_error(
+      flight_put(client, example_with_times, path = flight_obj, overwrite = FALSE),
+      "exists"
+    )
+    # Default is TRUE so this will overwrite
+    flight_put(client, example_with_times, path = flight_obj)
+    expect_identical(as.data.frame(flight_get(client, flight_obj)), example_with_times)
+  })
+
+} else {
+  # Kinda hacky, let's put a skipped test here, just so we note that the tests
+  # didn't run
+  test_that("Flight tests", {
+    skip("Flight server is not running")
+  })
+}
diff --git a/r/tests/testthat/test-python.R b/r/tests/testthat/test-python.R
index 6184f69..a073b73 100644
--- a/r/tests/testthat/test-python.R
+++ b/r/tests/testthat/test-python.R
@@ -38,7 +38,7 @@ test_that("Array from Python", {
 
 test_that("Array to Python", {
   skip_if_no_pyarrow()
-  pa <- reticulate::import("pyarrow", convert=FALSE)
+  pa <- reticulate::import("pyarrow", convert = FALSE)
   r <- Array$create(c(1, 2, 3))
   py <- pa$concat_arrays(list(r))
   expect_is(py, "pyarrow.lib.Array")
@@ -47,8 +47,8 @@ test_that("Array to Python", {
 
 test_that("RecordBatch to/from Python", {
   skip_if_no_pyarrow()
-  pa <- reticulate::import("pyarrow", convert=FALSE)
-  batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+  pa <- reticulate::import("pyarrow", convert = FALSE)
+  batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
   py <- reticulate::r_to_py(batch)
   expect_is(py, "pyarrow.lib.RecordBatch")
   expect_equal(reticulate::py_to_r(py), batch)
@@ -56,8 +56,8 @@ test_that("RecordBatch to/from Python", {
 
 test_that("Table and ChunkedArray from Python", {
   skip_if_no_pyarrow()
-  pa <- reticulate::import("pyarrow", convert=FALSE)
-  batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+  pa <- reticulate::import("pyarrow", convert = FALSE)
+  batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
   tab <- Table$create(batch, batch)
   pybatch <- reticulate::r_to_py(batch)
   pytab <- pa$Table$from_batches(list(pybatch, pybatch))
@@ -69,8 +69,7 @@ test_that("Table and ChunkedArray from Python", {
 
 test_that("Table and ChunkedArray to Python", {
   skip_if_no_pyarrow()
-  pa <- reticulate::import("pyarrow", convert=FALSE)
-  batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+  batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
   tab <- Table$create(batch, batch)
 
   pychunked <- reticulate::r_to_py(tab$col1)
@@ -81,3 +80,21 @@ test_that("Table and ChunkedArray to Python", {
   expect_is(pytab, "pyarrow.lib.Table")
   expect_equal(reticulate::py_to_r(pytab), tab)
 })
+
+test_that("RecordBatch with metadata roundtrip", {
+  skip_if_no_pyarrow()
+  batch <- RecordBatch$create(example_with_times)
+  pybatch <- reticulate::r_to_py(batch)
+  expect_is(pybatch, "pyarrow.lib.RecordBatch")
+  expect_equal(reticulate::py_to_r(pybatch), batch)
+  expect_identical(as.data.frame(reticulate::py_to_r(pybatch)), example_with_times)
+})
+
+test_that("Table with metadata roundtrip", {
+  skip_if_no_pyarrow()
+  tab <- Table$create(example_with_times)
+  pytab <- reticulate::r_to_py(tab)
+  expect_is(pytab, "pyarrow.lib.Table")
+  expect_equal(reticulate::py_to_r(pytab), tab)
+  expect_identical(as.data.frame(reticulate::py_to_r(pytab)), example_with_times)
+})
diff --git a/r/vignettes/flight.Rmd b/r/vignettes/flight.Rmd
index 202230f..9ca10d7 100644
--- a/r/vignettes/flight.Rmd
+++ b/r/vignettes/flight.Rmd
@@ -51,7 +51,7 @@ In a different R process, let's connect to it and put some data in it.
 library(arrow)
 client <- flight_connect(port = 8089)
 # Upload some data to our server so there's something to demo
-push_data(client, iris, path = "test_data/iris")
+flight_put(client, iris, path = "test_data/iris")
 ```
 
 Now, in a new R process, let's connect to the server and pull the data we