You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by np...@apache.org on 2020/12/30 16:29:33 UTC
[arrow] branch master updated: ARROW-10416: [R] Support Tables in
Flight
This is an automated email from the ASF dual-hosted git repository.
npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 635f12b ARROW-10416: [R] Support Tables in Flight
635f12b is described below
commit 635f12b3855994fff2de168f1ca14135769118f1
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Wed Dec 30 08:28:41 2020 -0800
ARROW-10416: [R] Support Tables in Flight
In addition to the feature described in the title, this PR
* renames `push_data()` to `flight_put()` and adds an `overwrite` argument to optionally check for the existence of a flight with the the same name
* adds `list_flights()` and `flight_path_exists()` to see available resources
* adds tests for the flight bindings, run if the demo flight server is found to be running (managing that process and/or running this in CI is deferred)
* adds `r_to_py` and `py_to_r` methods for Schema objects; this fixes an issue where schema metadata was lost when converting Python/R due to the indirect way that Tables are passed
Closes #9039 from nealrichardson/flight-tables
Authored-by: Neal Richardson <ne...@gmail.com>
Signed-off-by: Neal Richardson <ne...@gmail.com>
---
r/NAMESPACE | 4 ++-
r/R/arrow-package.R | 16 ++++-----
r/R/arrowExports.R | 4 +++
r/R/flight.R | 68 +++++++++++++++++++++++++++--------
r/R/python.R | 25 +++++++++++--
r/_pkgdown.yml | 3 +-
r/man/flight_get.Rd | 4 +--
r/man/{push_data.Rd => flight_put.Rd} | 12 ++++---
r/man/list_flights.Rd | 23 ++++++++++++
r/src/arrowExports.cpp | 16 +++++++++
r/src/py-to-r.cpp | 6 ++++
r/tests/testthat/test-python-flight.R | 63 ++++++++++++++++++++++++++++++++
r/tests/testthat/test-python.R | 31 ++++++++++++----
r/vignettes/flight.Rmd | 2 +-
14 files changed, 235 insertions(+), 42 deletions(-)
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 316679a..4c50dd9 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -217,6 +217,8 @@ export(fixed_size_binary)
export(fixed_size_list_of)
export(flight_connect)
export(flight_get)
+export(flight_path_exists)
+export(flight_put)
export(float)
export(float16)
export(float32)
@@ -233,6 +235,7 @@ export(large_binary)
export(large_list_of)
export(large_utf8)
export(last_col)
+export(list_flights)
export(list_of)
export(load_flight_server)
export(map_batches)
@@ -244,7 +247,6 @@ export(null)
export(num_range)
export(one_of)
export(open_dataset)
-export(push_data)
export(read_arrow)
export(read_csv_arrow)
export(read_delim_arrow)
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index 5d1dbbe..8ec3863 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -38,16 +38,13 @@
s3_register(m, cl)
}
}
-
s3_register("dplyr::tbl_vars", "arrow_dplyr_query")
- s3_register("reticulate::py_to_r", "pyarrow.lib.Array")
- s3_register("reticulate::py_to_r", "pyarrow.lib.RecordBatch")
- s3_register("reticulate::py_to_r", "pyarrow.lib.ChunkedArray")
- s3_register("reticulate::py_to_r", "pyarrow.lib.Table")
- s3_register("reticulate::r_to_py", "Array")
- s3_register("reticulate::r_to_py", "RecordBatch")
- s3_register("reticulate::r_to_py", "ChunkedArray")
- s3_register("reticulate::r_to_py", "Table")
+
+ for (cl in c("Array", "RecordBatch", "ChunkedArray", "Table", "Schema")) {
+ s3_register("reticulate::py_to_r", paste0("pyarrow.lib.", cl))
+ s3_register("reticulate::r_to_py", cl)
+ }
+
invisible()
}
@@ -128,4 +125,3 @@ ArrowObject <- R6Class("ArrowObject",
all.equal.ArrowObject <- function(target, current, ..., check.attributes = TRUE) {
target$Equals(current, check_metadata = check.attributes)
}
-
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index bea0217..65c4dcd 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1272,6 +1272,10 @@ ImportRecordBatch <- function(array, schema){
.Call(`_arrow_ImportRecordBatch` , array, schema)
}
+ImportSchema <- function(schema){
+ .Call(`_arrow_ImportSchema` , schema)
+}
+
allocate_arrow_schema <- function(){
.Call(`_arrow_allocate_arrow_schema` )
}
diff --git a/r/R/flight.R b/r/R/flight.R
index 7721661..486c59a 100644
--- a/r/R/flight.R
+++ b/r/R/flight.R
@@ -41,18 +41,27 @@ flight_connect <- function(host = "localhost", port, scheme = "grpc+tcp") {
#' Send data to a Flight server
#'
#' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
-#' @param data `data.frame` or [RecordBatch] to upload
+#' @param data `data.frame`, [RecordBatch], or [Table] to upload
#' @param path string identifier to store the data under
+#' @param overwrite logical: if `path` exists on `client` already, should we
+#' replace it with the contents of `data`? Default is `TRUE`; if `FALSE` and
+#' `path` exists, the function will error.
#' @return `client`, invisibly.
#' @export
-push_data <- function(client, data, path) {
- if (inherits(data, "data.frame")) {
- data <- record_batch(data)
+flight_put <- function(client, data, path, overwrite = TRUE) {
+ if (!overwrite && flight_path_exists(client, path)) {
+ stop(path, " exists.", call. = FALSE)
+ }
+ if (is.data.frame(data)) {
+ data <- Table$create(data)
}
- # TODO: this is only RecordBatch; handle Table
py_data <- reticulate::r_to_py(data)
writer <- client$do_put(descriptor_for_path(path), py_data$schema)[[1]]
- writer$write_batch(py_data)
+ if (inherits(data, "RecordBatch")) {
+ writer$write_batch(py_data)
+ } else {
+ writer$write_table(py_data)
+ }
writer$close()
invisible(client)
}
@@ -60,22 +69,53 @@ push_data <- function(client, data, path) {
#' Get data from a Flight server
#'
#' @param client `pyarrow.flight.FlightClient`, as returned by [flight_connect()]
-#' @param path string identifier under which the data is stored
-#' @return A [RecordBatch]
+#' @param path string identifier under which data is stored
+#' @return A [Table]
#' @export
flight_get <- function(client, path) {
+ reader <- flight_reader(client, path)
+ reader$read_all()
+}
+
+# TODO: could use this as a RecordBatch iterator, call $read_chunk() on this
+flight_reader <- function(client, path) {
info <- client$get_flight_info(descriptor_for_path(path))
# Hack: assume a single ticket, on the same server as client is already connected
ticket <- info$endpoints[[1]]$ticket
- reader <- client$do_get(ticket)
- # Next hack: assume a single record batch
- # TODO: read_all() instead? Or read all chunks and build Table in R?
- chunk <- reader$read_chunk()
- # Drop $app_metadata and just return the data
- chunk$data
+ client$do_get(ticket)
}
descriptor_for_path <- function(path) {
pa <- reticulate::import("pyarrow")
pa$flight$FlightDescriptor$for_path(path)
}
+
+#' See available resources on a Flight server
+#'
+#' @inheritParams flight_get
+#' @return `list_flights()` returns a character vector of paths.
+#' `flight_path_exists()` returns a logical value, the equivalent of `path %in% list_flights()`
+#' @export
+list_flights <- function(client) {
+ generator <- client$list_flights()
+ out <- reticulate::iterate(generator, function(x) as.character(x$descriptor$path[[1]]))
+ out
+}
+
+#' @rdname list_flights
+#' @export
+flight_path_exists <- function(client, path) {
+ it_exists <- tryCatch({
+ client$get_flight_info(descriptor_for_path(path))
+ TRUE
+ },
+ error = function(e) {
+ msg <- conditionMessage(e)
+ if (!any(grepl("ArrowKeyError", msg))) {
+ # Raise an error if this fails for any reason other than not found
+ stop(e)
+ }
+ FALSE
+ }
+ )
+}
diff --git a/r/R/python.R b/r/R/python.R
index 739f048..b200d93 100644
--- a/r/R/python.R
+++ b/r/R/python.R
@@ -90,7 +90,7 @@ py_to_r.pyarrow.lib.ChunkedArray <- function(x, ...) {
r_to_py.Table <- function(x, convert = FALSE) {
# Import with convert = FALSE so that `_import_from_c` returns a Python object
pa <- reticulate::import("pyarrow", convert = FALSE)
- out <- pa$Table$from_arrays(x$columns, names = names(x))
+ out <- pa$Table$from_arrays(x$columns, schema = x$schema)
# But set the convert attribute on the return object to the requested value
assign("convert", convert, out)
out
@@ -100,7 +100,28 @@ py_to_r.pyarrow.lib.Table <- function(x, ...) {
colnames <- maybe_py_to_r(x$column_names)
r_cols <- maybe_py_to_r(x$columns)
names(r_cols) <- colnames
- Table$create(!!!r_cols)
+ Table$create(!!!r_cols, schema = maybe_py_to_r(x$schema))
+}
+
+py_to_r.pyarrow.lib.Schema <- function(x, ...) {
+ schema_ptr <- allocate_arrow_schema()
+ on.exit(delete_arrow_schema(schema_ptr))
+
+ x$`_export_to_c`(schema_ptr)
+ ImportSchema(schema_ptr)
+}
+
+r_to_py.Schema <- function(x, convert = FALSE) {
+ schema_ptr <- allocate_arrow_schema()
+ on.exit(delete_arrow_schema(schema_ptr))
+
+ # Import with convert = FALSE so that `_import_from_c` returns a Python object
+ pa <- reticulate::import("pyarrow", convert = FALSE)
+ ExportSchema(x, schema_ptr)
+ out <- pa$Schema$`_import_from_c`(schema_ptr)
+ # But set the convert attribute on the return object to the requested value
+ assign("convert", convert, out)
+ out
}
maybe_py_to_r <- function(x) {
diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml
index d946f31..b3eb895 100644
--- a/r/_pkgdown.yml
+++ b/r/_pkgdown.yml
@@ -122,8 +122,9 @@ reference:
contents:
- load_flight_server
- flight_connect
- - push_data
- flight_get
+ - flight_put
+ - list_flights
- title: File systems
contents:
- s3_bucket
diff --git a/r/man/flight_get.Rd b/r/man/flight_get.Rd
index f427b39..a79c4d7 100644
--- a/r/man/flight_get.Rd
+++ b/r/man/flight_get.Rd
@@ -9,10 +9,10 @@ flight_get(client, path)
\arguments{
\item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
-\item{path}{string identifier under which the data is stored}
+\item{path}{string identifier under which data is stored}
}
\value{
-A \link{RecordBatch}
+A \link{Table}
}
\description{
Get data from a Flight server
diff --git a/r/man/push_data.Rd b/r/man/flight_put.Rd
similarity index 51%
rename from r/man/push_data.Rd
rename to r/man/flight_put.Rd
index 8fb65c7..13a8da1 100644
--- a/r/man/push_data.Rd
+++ b/r/man/flight_put.Rd
@@ -1,17 +1,21 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/flight.R
-\name{push_data}
-\alias{push_data}
+\name{flight_put}
+\alias{flight_put}
\title{Send data to a Flight server}
\usage{
-push_data(client, data, path)
+flight_put(client, data, path, overwrite = TRUE)
}
\arguments{
\item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
-\item{data}{\code{data.frame} or \link{RecordBatch} to upload}
+\item{data}{\code{data.frame}, \link{RecordBatch}, or \link{Table} to upload}
\item{path}{string identifier to store the data under}
+
+\item{overwrite}{logical: if \code{path} exists on \code{client} already, should we
+replace it with the contents of \code{data}? Default is \code{TRUE}; if \code{FALSE} and
+\code{path} exists, the function will error.}
}
\value{
\code{client}, invisibly.
diff --git a/r/man/list_flights.Rd b/r/man/list_flights.Rd
new file mode 100644
index 0000000..d8ebb0d
--- /dev/null
+++ b/r/man/list_flights.Rd
@@ -0,0 +1,23 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/flight.R
+\name{list_flights}
+\alias{list_flights}
+\alias{flight_path_exists}
+\title{See available resources on a Flight server}
+\usage{
+list_flights(client)
+
+flight_path_exists(client, path)
+}
+\arguments{
+\item{client}{\code{pyarrow.flight.FlightClient}, as returned by \code{\link[=flight_connect]{flight_connect()}}}
+
+\item{path}{string identifier under which data is stored}
+}
+\value{
+\code{list_flights()} returns a character vector of paths.
+\code{flight_path_exists()} returns a logical value, the equivalent of \code{path \%in\% list_flights()}
+}
+\description{
+See available resources on a Flight server
+}
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index f4314a9..093949d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4990,6 +4990,21 @@ extern "C" SEXP _arrow_ImportRecordBatch(SEXP array_sexp, SEXP schema_sexp){
// py-to-r.cpp
#if defined(ARROW_R_WITH_ARROW)
+std::shared_ptr<arrow::Schema> ImportSchema(arrow::r::Pointer<struct ArrowSchema> schema);
+extern "C" SEXP _arrow_ImportSchema(SEXP schema_sexp){
+BEGIN_CPP11
+ arrow::r::Input<arrow::r::Pointer<struct ArrowSchema>>::type schema(schema_sexp);
+ return cpp11::as_sexp(ImportSchema(schema));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_ImportSchema(SEXP schema_sexp){
+ Rf_error("Cannot call ImportSchema(). Please use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// py-to-r.cpp
+#if defined(ARROW_R_WITH_ARROW)
arrow::r::Pointer<struct ArrowSchema> allocate_arrow_schema();
extern "C" SEXP _arrow_allocate_arrow_schema(){
BEGIN_CPP11
@@ -6724,6 +6739,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_parquet___arrow___FileReader__GetSchema", (DL_FUNC) &_arrow_parquet___arrow___FileReader__GetSchema, 1},
{ "_arrow_ImportArray", (DL_FUNC) &_arrow_ImportArray, 2},
{ "_arrow_ImportRecordBatch", (DL_FUNC) &_arrow_ImportRecordBatch, 2},
+ { "_arrow_ImportSchema", (DL_FUNC) &_arrow_ImportSchema, 1},
{ "_arrow_allocate_arrow_schema", (DL_FUNC) &_arrow_allocate_arrow_schema, 0},
{ "_arrow_delete_arrow_schema", (DL_FUNC) &_arrow_delete_arrow_schema, 1},
{ "_arrow_allocate_arrow_array", (DL_FUNC) &_arrow_allocate_arrow_array, 0},
diff --git a/r/src/py-to-r.cpp b/r/src/py-to-r.cpp
index 5677791..a571cfa 100644
--- a/r/src/py-to-r.cpp
+++ b/r/src/py-to-r.cpp
@@ -35,6 +35,12 @@ std::shared_ptr<arrow::RecordBatch> ImportRecordBatch(
}
// [[arrow::export]]
+std::shared_ptr<arrow::Schema> ImportSchema(
+ arrow::r::Pointer<struct ArrowSchema> schema) {
+ return ValueOrStop(arrow::ImportSchema(schema));
+}
+
+// [[arrow::export]]
arrow::r::Pointer<struct ArrowSchema> allocate_arrow_schema() { return {}; }
// [[arrow::export]]
diff --git a/r/tests/testthat/test-python-flight.R b/r/tests/testthat/test-python-flight.R
new file mode 100644
index 0000000..dbd2ba9
--- /dev/null
+++ b/r/tests/testthat/test-python-flight.R
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Assumes:
+# * We've already done arrow::install_pyarrow()
+# * R -e 'arrow::load_flight_server("demo_flight_server")$DemoFlightServer(port = 8089)$serve()'
+# TODO: set up CI job to test this, or some way of running a background process
+if (process_is_running("demo_flight_server")) {
+ client <- flight_connect(port = 8089)
+ flight_obj <- tempfile()
+
+ test_that("flight_path_exists", {
+ expect_false(flight_path_exists(client, flight_obj))
+ expect_false(flight_obj %in% list_flights(client))
+ })
+
+ test_that("flight_put", {
+ flight_put(client, example_data, path = flight_obj)
+ expect_true(flight_path_exists(client, flight_obj))
+ expect_true(flight_obj %in% list_flights(client))
+ })
+
+ test_that("flight_get", {
+ expect_identical(as.data.frame(flight_get(client, flight_obj)), example_data)
+ })
+
+ test_that("flight_put with RecordBatch", {
+ flight_obj2 <- tempfile()
+ flight_put(client, RecordBatch$create(example_data), path = flight_obj2)
+ expect_identical(as.data.frame(flight_get(client, flight_obj2)), example_data)
+ })
+
+ test_that("flight_put with overwrite = FALSE", {
+ expect_error(
+ flight_put(client, example_with_times, path = flight_obj, overwrite = FALSE),
+ "exists"
+ )
+ # Default is TRUE so this will overwrite
+ flight_put(client, example_with_times, path = flight_obj)
+ expect_identical(as.data.frame(flight_get(client, flight_obj)), example_with_times)
+ })
+
+} else {
+ # Kinda hacky, let's put a skipped test here, just so we note that the tests
+ # didn't run
+ test_that("Flight tests", {
+ skip("Flight server is not running")
+ })
+}
diff --git a/r/tests/testthat/test-python.R b/r/tests/testthat/test-python.R
index 6184f69..a073b73 100644
--- a/r/tests/testthat/test-python.R
+++ b/r/tests/testthat/test-python.R
@@ -38,7 +38,7 @@ test_that("Array from Python", {
test_that("Array to Python", {
skip_if_no_pyarrow()
- pa <- reticulate::import("pyarrow", convert=FALSE)
+ pa <- reticulate::import("pyarrow", convert = FALSE)
r <- Array$create(c(1, 2, 3))
py <- pa$concat_arrays(list(r))
expect_is(py, "pyarrow.lib.Array")
@@ -47,8 +47,8 @@ test_that("Array to Python", {
test_that("RecordBatch to/from Python", {
skip_if_no_pyarrow()
- pa <- reticulate::import("pyarrow", convert=FALSE)
- batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+ pa <- reticulate::import("pyarrow", convert = FALSE)
+ batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
py <- reticulate::r_to_py(batch)
expect_is(py, "pyarrow.lib.RecordBatch")
expect_equal(reticulate::py_to_r(py), batch)
@@ -56,8 +56,8 @@ test_that("RecordBatch to/from Python", {
test_that("Table and ChunkedArray from Python", {
skip_if_no_pyarrow()
- pa <- reticulate::import("pyarrow", convert=FALSE)
- batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+ pa <- reticulate::import("pyarrow", convert = FALSE)
+ batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
tab <- Table$create(batch, batch)
pybatch <- reticulate::r_to_py(batch)
pytab <- pa$Table$from_batches(list(pybatch, pybatch))
@@ -69,8 +69,7 @@ test_that("Table and ChunkedArray from Python", {
test_that("Table and ChunkedArray to Python", {
skip_if_no_pyarrow()
- pa <- reticulate::import("pyarrow", convert=FALSE)
- batch <- record_batch(col1=c(1, 2, 3), col2=letters[1:3])
+ batch <- record_batch(col1 = c(1, 2, 3), col2 = letters[1:3])
tab <- Table$create(batch, batch)
pychunked <- reticulate::r_to_py(tab$col1)
@@ -81,3 +80,21 @@ test_that("Table and ChunkedArray to Python", {
expect_is(pytab, "pyarrow.lib.Table")
expect_equal(reticulate::py_to_r(pytab), tab)
})
+
+test_that("RecordBatch with metadata roundtrip", {
+ skip_if_no_pyarrow()
+ batch <- RecordBatch$create(example_with_times)
+ pybatch <- reticulate::r_to_py(batch)
+ expect_is(pybatch, "pyarrow.lib.RecordBatch")
+ expect_equal(reticulate::py_to_r(pybatch), batch)
+ expect_identical(as.data.frame(reticulate::py_to_r(pybatch)), example_with_times)
+})
+
+test_that("Table with metadata roundtrip", {
+ skip_if_no_pyarrow()
+ tab <- Table$create(example_with_times)
+ pytab <- reticulate::r_to_py(tab)
+ expect_is(pytab, "pyarrow.lib.Table")
+ expect_equal(reticulate::py_to_r(pytab), tab)
+ expect_identical(as.data.frame(reticulate::py_to_r(pytab)), example_with_times)
+})
diff --git a/r/vignettes/flight.Rmd b/r/vignettes/flight.Rmd
index 202230f..9ca10d7 100644
--- a/r/vignettes/flight.Rmd
+++ b/r/vignettes/flight.Rmd
@@ -51,7 +51,7 @@ In a different R process, let's connect to it and put some data in it.
library(arrow)
client <- flight_connect(port = 8089)
# Upload some data to our server so there's something to demo
-push_data(client, iris, path = "test_data/iris")
+flight_put(client, iris, path = "test_data/iris")
```
Now, in a new R process, let's connect to the server and pull the data we