You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/06/27 19:52:22 UTC
[arrow] branch master updated: ARROW-5718: [R] auto splice data
frames in record_batch() and table()
This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4e6759c ARROW-5718: [R] auto splice data frames in record_batch() and table()
4e6759c is described below
commit 4e6759c11cb4a397cfd825d24775b3c1fe1d94db
Author: Romain Francois <ro...@rstudio.com>
AuthorDate: Thu Jun 27 15:52:13 2019 -0400
ARROW-5718: [R] auto splice data frames in record_batch() and table()
elements of `...` that are unnamed in `record_batch(...)` or `table(...)` are automatically spliced:
``` r
library(arrow, warn.conflicts = FALSE)
library(tibble)
df <- tibble(x = 1:10, y = letters[1:10])
```
``` r
# auto splicing
batch <- record_batch(df, z = 1:10)
as_tibble(batch)
#> # A tibble: 10 x 3
#> x y z
#> <int> <chr> <int>
#> 1 1 a 1
#> 2 2 b 2
#> 3 3 c 3
#> 4 4 d 4
#> 5 5 e 5
#> 6 6 f 6
#> 7 7 g 7
#> 8 8 h 8
#> 9 9 i 9
#> 10 10 j 10
# same as explicit splicing
batch <- record_batch(!!!df, z = 1:10)
as_tibble(batch)
#> # A tibble: 10 x 3
#> x y z
#> <int> <chr> <int>
#> 1 1 a 1
#> 2 2 b 2
#> 3 3 c 3
#> 4 4 d 4
#> 5 5 e 5
#> 6 6 f 6
#> 7 7 g 7
#> 8 8 h 8
#> 9 9 i 9
#> 10 10 j 10
```
``` r
# auto splicing
tab <- table(df, z = 1:10)
as_tibble(tab)
#> # A tibble: 10 x 3
#> x y z
#> <int> <chr> <int>
#> 1 1 a 1
#> 2 2 b 2
#> 3 3 c 3
#> 4 4 d 4
#> 5 5 e 5
#> 6 6 f 6
#> 7 7 g 7
#> 8 8 h 8
#> 9 9 i 9
#> 10 10 j 10
# same as explicit splicing
tab <- table(!!!df, z = 1:10)
as_tibble(tab)
#> # A tibble: 10 x 3
#> x y z
#> <int> <chr> <int>
#> 1 1 a 1
#> 2 2 b 2
#> 3 3 c 3
#> 4 4 d 4
#> 5 5 e 5
#> 6 6 f 6
#> 7 7 g 7
#> 8 8 h 8
#> 9 9 i 9
#> 10 10 j 10
```
In particular, this gives us back `record_batch(<data.frame>)` and `table(<data.frame>)` :
``` r
library(arrow, warn.conflicts = FALSE)
library(tibble)
df <- tibble(x = 1:10, y = letters[1:10])
record_batch(df)
#> arrow::RecordBatch
table(df)
#> arrow::Table
```
Author: Romain Francois <ro...@rstudio.com>
Closes #4704 from romainfrancois/ARROW-5718/df_auto_splice and squashes the following commits:
79a70412c <Romain Francois> more tests about auto splicingh in record_batch()
28bc470c9 <Romain Francois> + comment
61902ab7c <Romain Francois> table() auto splicing
0a4892c21 <Romain Francois> record_batch() auto splicing of data frames
5eabd202a <Romain Francois> trim redundant code
---
r/R/RecordBatch.R | 4 ++
r/R/Table.R | 4 ++
r/src/arrow_types.h | 2 +
r/src/recordbatch.cpp | 113 ++++++++++++++++++++++++++----------
r/src/table.cpp | 74 ++++++++++++++++-------
r/tests/testthat/test-RecordBatch.R | 53 ++++++++++++++++-
r/tests/testthat/test-Table.R | 18 ++++++
7 files changed, 213 insertions(+), 55 deletions(-)
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 8c90254..6446c95 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -98,6 +98,10 @@
#' @export
record_batch <- function(..., schema = NULL){
arrays <- list2(...)
+ # making sure there are always names
+ if (is.null(names(arrays))) {
+ names(arrays) <- rep_len("", length(arrays))
+ }
stopifnot(length(arrays) > 0)
shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays))
}
diff --git a/r/R/Table.R b/r/R/Table.R
index 1aec916..51320fd 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -74,6 +74,10 @@
#' @export
table <- function(..., schema = NULL){
dots <- list2(...)
+ # making sure there are always names
+ if (is.null(names(dots))) {
+ names(dots) <- rep_len("", length(dots))
+ }
stopifnot(length(dots) > 0)
shared_ptr(`arrow::Table`, Table__from_dots(dots, schema))
}
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 3ff1a3d..4259a48 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -197,6 +197,8 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame
namespace arrow {
namespace r {
+Status count_fields(SEXP lst, int* out);
+
std::shared_ptr<arrow::Array> Array__from_vector(
SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_infered);
diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp
index 289c3a9..11d3d6a 100644
--- a/r/src/recordbatch.cpp
+++ b/r/src/recordbatch.cpp
@@ -148,6 +148,9 @@ std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__InputStream__Schema(
return batch;
}
+namespace arrow {
+namespace r {
+
arrow::Status check_consistent_array_size(
const std::vector<std::shared_ptr<arrow::Array>>& arrays, int64_t* num_rows) {
if (arrays.size()) {
@@ -163,30 +166,69 @@ arrow::Status check_consistent_array_size(
return arrow::Status::OK();
}
+Status count_fields(SEXP lst, int* out) {
+ int res = 0;
+ R_xlen_t n = XLENGTH(lst);
+ SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+ for (R_xlen_t i = 0; i < n; i++) {
+ if (LENGTH(STRING_ELT(names, i)) > 0) {
+ ++res;
+ } else {
+ SEXP x = VECTOR_ELT(lst, i);
+ if (Rf_inherits(x, "data.frame")) {
+ res += XLENGTH(x);
+ } else {
+ return Status::RError(
+ "only data frames are allowed as unnamed arguments to be auto spliced");
+ }
+ }
+ }
+ *out = res;
+ return Status::OK();
+}
+
+} // namespace r
+} // namespace arrow
+
std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays__known_schema(
const std::shared_ptr<arrow::Schema>& schema, SEXP lst) {
- R_xlen_t n_arrays = XLENGTH(lst);
- if (schema->num_fields() != n_arrays) {
+ int num_fields;
+ STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
+
+ if (schema->num_fields() != num_fields) {
Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied",
- schema->num_fields(), n_arrays);
+ schema->num_fields(), num_fields);
}
// convert lst to a vector of arrow::Array
- std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
+ std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
- bool has_names = !Rf_isNull(names);
- for (R_xlen_t i = 0; i < n_arrays; i++) {
- if (has_names && schema->field(i)->name() != CHAR(STRING_ELT(names, i))) {
- Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1,
- schema->field(i)->name(), CHAR(STRING_ELT(names, i)));
+ auto fill_array = [&arrays, &schema](int j, SEXP x, SEXP name) {
+ if (schema->field(j)->name() != CHAR(name)) {
+ Rcpp::stop("field at index %d has name '%s' != '%s'", j + 1,
+ schema->field(j)->name(), CHAR(name));
+ }
+ arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), false);
+ };
+
+ for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+ SEXP name_i = STRING_ELT(names, i);
+ SEXP x_i = VECTOR_ELT(lst, i);
+
+ if (LENGTH(name_i) == 0) {
+ SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+ for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+ fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+ }
+ } else {
+ fill_array(j, x_i, name_i);
+ j++;
}
- arrays[i] =
- arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false);
}
int64_t num_rows = 0;
- STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+ STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));
return arrow::RecordBatch::Make(schema, num_rows, arrays);
}
@@ -197,38 +239,45 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP schema_sxp, SE
arrow::r::extract<arrow::Schema>(schema_sxp), lst);
}
- R_xlen_t n_arrays = XLENGTH(lst);
+ int num_fields;
+ STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
// convert lst to a vector of arrow::Array
- std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
- for (R_xlen_t i = 0; i < n_arrays; i++) {
- arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue);
+ std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
+ std::vector<std::string> arrays_names(num_fields);
+ SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+
+ auto fill_array = [&arrays, &arrays_names](int j, SEXP x, SEXP name) {
+ arrays[j] = Array__from_vector(x, R_NilValue);
+ arrays_names[j] = CHAR(name);
+ };
+
+ for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+ SEXP name_i = STRING_ELT(names, i);
+ SEXP x_i = VECTOR_ELT(lst, i);
+ if (LENGTH(name_i) == 0) {
+ SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+ for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+ fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+ }
+ } else {
+ fill_array(j, x_i, name_i);
+ j++;
+ }
}
// generate schema from the types that have been infered
std::shared_ptr<arrow::Schema> schema;
- if (Rf_inherits(schema_sxp, "arrow::Schema")) {
- schema = arrow::r::extract<arrow::Schema>(schema_sxp);
- } else {
- Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
- std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
- for (R_xlen_t i = 0; i < n_arrays; i++) {
- fields[i] =
- std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
- }
- schema = std::make_shared<arrow::Schema>(std::move(fields));
- }
- Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
- std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
- for (R_xlen_t i = 0; i < n_arrays; i++) {
- fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
+ std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
+ for (R_xlen_t i = 0; i < num_fields; i++) {
+ fields[i] = std::make_shared<arrow::Field>(arrays_names[i], arrays[i]->type());
}
schema = std::make_shared<arrow::Schema>(std::move(fields));
// check all sizes are the same
int64_t num_rows = 0;
- STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+ STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));
return arrow::RecordBatch::Make(schema, num_rows, arrays);
}
diff --git a/r/src/table.cpp b/r/src/table.cpp
index 2d9135b..1e958d0 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -117,54 +117,84 @@ std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp) {
return tab;
}
- R_xlen_t n = XLENGTH(lst);
- std::vector<std::shared_ptr<arrow::Column>> columns(n);
+ int num_fields;
+ STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
+
+ std::vector<std::shared_ptr<arrow::Column>> columns(num_fields);
std::shared_ptr<arrow::Schema> schema;
if (Rf_isNull(schema_sxp)) {
// infer the schema from the ...
- std::vector<std::shared_ptr<arrow::Field>> fields(n);
- Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
+ std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
+ SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
- for (R_xlen_t i = 0; i < n; i++) {
- SEXP x = VECTOR_ELT(lst, i);
+ auto fill_one_column = [&columns, &fields](int j, SEXP x, SEXP name) {
if (Rf_inherits(x, "arrow::Column")) {
- columns[i] = arrow::r::extract<arrow::Column>(x);
- fields[i] = columns[i]->field();
+ columns[j] = arrow::r::extract<arrow::Column>(x);
+ fields[j] = columns[j]->field();
} else if (Rf_inherits(x, "arrow::ChunkedArray")) {
auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
- fields[i] =
- std::make_shared<arrow::Field>(std::string(names[i]), chunked_array->type());
- columns[i] = std::make_shared<arrow::Column>(fields[i], chunked_array);
+ fields[j] = std::make_shared<arrow::Field>(CHAR(name), chunked_array->type());
+ columns[j] = std::make_shared<arrow::Column>(fields[j], chunked_array);
} else if (Rf_inherits(x, "arrow::Array")) {
auto array = arrow::r::extract<arrow::Array>(x);
- fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
- columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+ fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
+ columns[j] = std::make_shared<arrow::Column>(fields[j], array);
} else {
auto array = Array__from_vector(x, R_NilValue);
- fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
- columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+ fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
+ columns[j] = std::make_shared<arrow::Column>(fields[j], array);
+ }
+ };
+
+ for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+ SEXP name_i = STRING_ELT(names, i);
+ SEXP x_i = VECTOR_ELT(lst, i);
+
+ if (LENGTH(name_i) == 0) {
+ SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+ for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+ fill_one_column(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+ }
+ } else {
+ fill_one_column(j, x_i, name_i);
+ j++;
}
}
+
schema = std::make_shared<arrow::Schema>(std::move(fields));
} else {
// use the schema that is given
schema = arrow::r::extract<arrow::Schema>(schema_sxp);
- for (R_xlen_t i = 0; i < n; i++) {
- SEXP x = VECTOR_ELT(lst, i);
+ auto fill_one_column = [&columns, &schema](int j, SEXP x) {
if (Rf_inherits(x, "arrow::Column")) {
- columns[i] = arrow::r::extract<arrow::Column>(x);
+ columns[j] = arrow::r::extract<arrow::Column>(x);
} else if (Rf_inherits(x, "arrow::ChunkedArray")) {
auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
- columns[i] = std::make_shared<arrow::Column>(schema->field(i), chunked_array);
+ columns[j] = std::make_shared<arrow::Column>(schema->field(j), chunked_array);
} else if (Rf_inherits(x, "arrow::Array")) {
auto array = arrow::r::extract<arrow::Array>(x);
- columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+ columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
} else {
- auto type = schema->field(i)->type();
+ auto type = schema->field(j)->type();
auto array = arrow::r::Array__from_vector(x, type, false);
- columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+ columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
+ }
+ };
+
+ SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+ for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+ SEXP name_i = STRING_ELT(names, i);
+ SEXP x_i = VECTOR_ELT(lst, i);
+
+ if (LENGTH(name_i) == 0) {
+ for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+ fill_one_column(j, VECTOR_ELT(x_i, k));
+ }
+ } else {
+ fill_one_column(j, x_i);
+ j++;
}
}
}
diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R
index c77e1f1..d5a141c 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -153,10 +153,24 @@ test_that("record_batch() handles arrow::Array", {
test_that("record_batch() handles data frame columns", {
tib <- tibble::tibble(x = 1:10, y = 1:10)
+ # because tib is named here, this becomes a struct array
batch <- record_batch(a = 1:10, b = tib)
- expect_equal(batch$schema, schema(a = int32(), struct(x = int32(), y = int32())))
+ expect_equal(batch$schema,
+ schema(
+ a = int32(),
+ struct(x = int32(), y = int32())
+ )
+ )
out <- as.data.frame(batch)
expect_equivalent(out, tibble::tibble(a = 1:10, b = tib))
+
+ # if not named, columns from tib are auto spliced
+ batch2 <- record_batch(a = 1:10, tib)
+ expect_equal(batch$schema,
+ schema(a = int32(), x = int32(), y = int32())
+ )
+ out <- as.data.frame(batch2)
+ expect_equivalent(out, tibble::tibble(a = 1:10, !!!tib))
})
test_that("record_batch() handles data frame columns with schema spec", {
@@ -170,3 +184,40 @@ test_that("record_batch() handles data frame columns with schema spec", {
schema <- schema(a = int32(), b = struct(x = int16(), y = utf8()))
expect_error(record_batch(a = 1:10, b = tib, schema = schema))
})
+
+test_that("record_batch() auto splices (ARROW-5718)", {
+ df <- tibble::tibble(x = 1:10, y = letters[1:10])
+ batch1 <- record_batch(df)
+ batch2 <- record_batch(!!!df)
+ expect_equal(batch1, batch2)
+ expect_equal(batch1$schema, schema(x = int32(), y = utf8()))
+ expect_equivalent(as.data.frame(batch1), df)
+
+ batch3 <- record_batch(df, z = 1:10)
+ batch4 <- record_batch(!!!df, z = 1:10)
+ expect_equal(batch3, batch4)
+ expect_equal(batch3$schema, schema(x = int32(), y = utf8(), z = int32()))
+ expect_equivalent(as.data.frame(batch3), cbind(df, data.frame(z = 1:10)))
+
+ s <- schema(x = float64(), y = utf8())
+ batch5 <- record_batch(df, schema = s)
+ batch6 <- record_batch(!!!df, schema = s)
+ expect_equal(batch5, batch6)
+ expect_equal(batch5$schema, s)
+ expect_equivalent(as.data.frame(batch5), df)
+
+ s2 <- schema(x = float64(), y = utf8(), z = int16())
+ batch7 <- record_batch(df, z = 1:10, schema = s2)
+ batch8 <- record_batch(!!!df, z = 1:10, schema = s2)
+ expect_equal(batch7, batch8)
+ expect_equal(batch7$schema, s2)
+ expect_equivalent(as.data.frame(batch7), cbind(df, data.frame(z = 1:10)))
+})
+
+test_that("record_batch() only auto splice data frames", {
+ expect_error(
+ record_batch(1:10),
+ regexp = "only data frames are allowed as unnamed arguments to be auto spliced"
+ )
+})
+
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 56b0969..ae948f2 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -119,3 +119,21 @@ test_that("table() handles ... of arrays, chunked arrays, vectors", {
tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10])
)
})
+
+test_that("table() auto splices (ARROW-5718)", {
+ df <- tibble::tibble(x = 1:10, y = letters[1:10])
+
+ tab1 <- table(df)
+ tab2 <- table(!!!df)
+ expect_equal(tab1, tab2)
+ expect_equal(tab1$schema, schema(x = int32(), y = utf8()))
+ expect_equivalent(as.data.frame(tab1), df)
+
+ s <- schema(x = float64(), y = utf8())
+ tab3 <- table(df, schema = s)
+ tab4 <- table(!!!df, schema = s)
+ expect_equal(tab3, tab4)
+ expect_equal(tab3$schema, s)
+ expect_equivalent(as.data.frame(tab3), df)
+})
+