You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by fs...@apache.org on 2019/06/27 19:52:22 UTC

[arrow] branch master updated: ARROW-5718: [R] auto splice data frames in record_batch() and table()

This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e6759c  ARROW-5718: [R] auto splice data frames in record_batch() and table()
4e6759c is described below

commit 4e6759c11cb4a397cfd825d24775b3c1fe1d94db
Author: Romain Francois <ro...@rstudio.com>
AuthorDate: Thu Jun 27 15:52:13 2019 -0400

    ARROW-5718: [R] auto splice data frames in record_batch() and table()
    
    elements of `...` that are unnamed in `record_batch(...)` or `table(...)` are automatically spliced:
    
    ``` r
    library(arrow, warn.conflicts = FALSE)
    library(tibble)
    
    df <- tibble(x = 1:10, y = letters[1:10])
    ```
    
    ``` r
    
    # auto splicing
    batch <- record_batch(df, z = 1:10)
    as_tibble(batch)
    #> # A tibble: 10 x 3
    #>        x y         z
    #>    <int> <chr> <int>
    #>  1     1 a         1
    #>  2     2 b         2
    #>  3     3 c         3
    #>  4     4 d         4
    #>  5     5 e         5
    #>  6     6 f         6
    #>  7     7 g         7
    #>  8     8 h         8
    #>  9     9 i         9
    #> 10    10 j        10
    
    # same as explicit splicing
    batch <- record_batch(!!!df, z = 1:10)
    as_tibble(batch)
    #> # A tibble: 10 x 3
    #>        x y         z
    #>    <int> <chr> <int>
    #>  1     1 a         1
    #>  2     2 b         2
    #>  3     3 c         3
    #>  4     4 d         4
    #>  5     5 e         5
    #>  6     6 f         6
    #>  7     7 g         7
    #>  8     8 h         8
    #>  9     9 i         9
    #> 10    10 j        10
    ```
    
    ``` r
    
    # auto splicing
    tab <- table(df, z = 1:10)
    as_tibble(tab)
    #> # A tibble: 10 x 3
    #>        x y         z
    #>    <int> <chr> <int>
    #>  1     1 a         1
    #>  2     2 b         2
    #>  3     3 c         3
    #>  4     4 d         4
    #>  5     5 e         5
    #>  6     6 f         6
    #>  7     7 g         7
    #>  8     8 h         8
    #>  9     9 i         9
    #> 10    10 j        10
    
    # same as explicit splicing
    tab <- table(!!!df, z = 1:10)
    as_tibble(tab)
    #> # A tibble: 10 x 3
    #>        x y         z
    #>    <int> <chr> <int>
    #>  1     1 a         1
    #>  2     2 b         2
    #>  3     3 c         3
    #>  4     4 d         4
    #>  5     5 e         5
    #>  6     6 f         6
    #>  7     7 g         7
    #>  8     8 h         8
    #>  9     9 i         9
    #> 10    10 j        10
    ```
    
    In particular, this gives us back `record_batch(<data.frame>)` and `table(<data.frame>)` :
    
    ``` r
    library(arrow, warn.conflicts = FALSE)
    library(tibble)
    
    df <- tibble(x = 1:10, y = letters[1:10])
    record_batch(df)
    #> arrow::RecordBatch
    table(df)
    #> arrow::Table
    ```
    
    Author: Romain Francois <ro...@rstudio.com>
    
    Closes #4704 from romainfrancois/ARROW-5718/df_auto_splice and squashes the following commits:
    
    79a70412c <Romain Francois> more tests about auto splicingh in record_batch()
    28bc470c9 <Romain Francois> + comment
    61902ab7c <Romain Francois> table() auto splicing
    0a4892c21 <Romain Francois> record_batch() auto splicing of data frames
    5eabd202a <Romain Francois> trim redundant code
---
 r/R/RecordBatch.R                   |   4 ++
 r/R/Table.R                         |   4 ++
 r/src/arrow_types.h                 |   2 +
 r/src/recordbatch.cpp               | 113 ++++++++++++++++++++++++++----------
 r/src/table.cpp                     |  74 ++++++++++++++++-------
 r/tests/testthat/test-RecordBatch.R |  53 ++++++++++++++++-
 r/tests/testthat/test-Table.R       |  18 ++++++
 7 files changed, 213 insertions(+), 55 deletions(-)

diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 8c90254..6446c95 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -98,6 +98,10 @@
 #' @export
 record_batch <- function(..., schema = NULL){
   arrays <- list2(...)
+  # making sure there are always names
+  if (is.null(names(arrays))) {
+    names(arrays) <- rep_len("", length(arrays))
+  }
   stopifnot(length(arrays) > 0)
   shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays))
 }
diff --git a/r/R/Table.R b/r/R/Table.R
index 1aec916..51320fd 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -74,6 +74,10 @@
 #' @export
 table <- function(..., schema = NULL){
   dots <- list2(...)
+  # making sure there are always names
+  if (is.null(names(dots))) {
+    names(dots) <- rep_len("", length(dots))
+  }
   stopifnot(length(dots) > 0)
   shared_ptr(`arrow::Table`, Table__from_dots(dots, schema))
 }
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 3ff1a3d..4259a48 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -197,6 +197,8 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame
 namespace arrow {
 namespace r {
 
+Status count_fields(SEXP lst, int* out);
+
 std::shared_ptr<arrow::Array> Array__from_vector(
     SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_infered);
 
diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp
index 289c3a9..11d3d6a 100644
--- a/r/src/recordbatch.cpp
+++ b/r/src/recordbatch.cpp
@@ -148,6 +148,9 @@ std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__InputStream__Schema(
   return batch;
 }
 
+namespace arrow {
+namespace r {
+
 arrow::Status check_consistent_array_size(
     const std::vector<std::shared_ptr<arrow::Array>>& arrays, int64_t* num_rows) {
   if (arrays.size()) {
@@ -163,30 +166,69 @@ arrow::Status check_consistent_array_size(
   return arrow::Status::OK();
 }
 
+Status count_fields(SEXP lst, int* out) {
+  int res = 0;
+  R_xlen_t n = XLENGTH(lst);
+  SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+  for (R_xlen_t i = 0; i < n; i++) {
+    if (LENGTH(STRING_ELT(names, i)) > 0) {
+      ++res;
+    } else {
+      SEXP x = VECTOR_ELT(lst, i);
+      if (Rf_inherits(x, "data.frame")) {
+        res += XLENGTH(x);
+      } else {
+        return Status::RError(
+            "only data frames are allowed as unnamed arguments to be auto spliced");
+      }
+    }
+  }
+  *out = res;
+  return Status::OK();
+}
+
+}  // namespace r
+}  // namespace arrow
+
 std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays__known_schema(
     const std::shared_ptr<arrow::Schema>& schema, SEXP lst) {
-  R_xlen_t n_arrays = XLENGTH(lst);
-  if (schema->num_fields() != n_arrays) {
+  int num_fields;
+  STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
+
+  if (schema->num_fields() != num_fields) {
     Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied",
-               schema->num_fields(), n_arrays);
+               schema->num_fields(), num_fields);
   }
 
   // convert lst to a vector of arrow::Array
-  std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
+  std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
   SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
-  bool has_names = !Rf_isNull(names);
 
-  for (R_xlen_t i = 0; i < n_arrays; i++) {
-    if (has_names && schema->field(i)->name() != CHAR(STRING_ELT(names, i))) {
-      Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1,
-                 schema->field(i)->name(), CHAR(STRING_ELT(names, i)));
+  auto fill_array = [&arrays, &schema](int j, SEXP x, SEXP name) {
+    if (schema->field(j)->name() != CHAR(name)) {
+      Rcpp::stop("field at index %d has name '%s' != '%s'", j + 1,
+                 schema->field(j)->name(), CHAR(name));
+    }
+    arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), false);
+  };
+
+  for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+    SEXP name_i = STRING_ELT(names, i);
+    SEXP x_i = VECTOR_ELT(lst, i);
+
+    if (LENGTH(name_i) == 0) {
+      SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+      for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+        fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+      }
+    } else {
+      fill_array(j, x_i, name_i);
+      j++;
     }
-    arrays[i] =
-        arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false);
   }
 
   int64_t num_rows = 0;
-  STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+  STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));
   return arrow::RecordBatch::Make(schema, num_rows, arrays);
 }
 
@@ -197,38 +239,45 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP schema_sxp, SE
         arrow::r::extract<arrow::Schema>(schema_sxp), lst);
   }
 
-  R_xlen_t n_arrays = XLENGTH(lst);
+  int num_fields;
+  STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
 
   // convert lst to a vector of arrow::Array
-  std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
-  for (R_xlen_t i = 0; i < n_arrays; i++) {
-    arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue);
+  std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
+  std::vector<std::string> arrays_names(num_fields);
+  SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+
+  auto fill_array = [&arrays, &arrays_names](int j, SEXP x, SEXP name) {
+    arrays[j] = Array__from_vector(x, R_NilValue);
+    arrays_names[j] = CHAR(name);
+  };
+
+  for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+    SEXP name_i = STRING_ELT(names, i);
+    SEXP x_i = VECTOR_ELT(lst, i);
+    if (LENGTH(name_i) == 0) {
+      SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+      for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+        fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+      }
+    } else {
+      fill_array(j, x_i, name_i);
+      j++;
+    }
   }
 
   // generate schema from the types that have been infered
   std::shared_ptr<arrow::Schema> schema;
-  if (Rf_inherits(schema_sxp, "arrow::Schema")) {
-    schema = arrow::r::extract<arrow::Schema>(schema_sxp);
-  } else {
-    Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
-    std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
-    for (R_xlen_t i = 0; i < n_arrays; i++) {
-      fields[i] =
-          std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
-    }
-    schema = std::make_shared<arrow::Schema>(std::move(fields));
-  }
 
-  Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
-  std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
-  for (R_xlen_t i = 0; i < n_arrays; i++) {
-    fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
+  std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
+  for (R_xlen_t i = 0; i < num_fields; i++) {
+    fields[i] = std::make_shared<arrow::Field>(arrays_names[i], arrays[i]->type());
   }
   schema = std::make_shared<arrow::Schema>(std::move(fields));
 
   // check all sizes are the same
   int64_t num_rows = 0;
-  STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+  STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));
 
   return arrow::RecordBatch::Make(schema, num_rows, arrays);
 }
diff --git a/r/src/table.cpp b/r/src/table.cpp
index 2d9135b..1e958d0 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -117,54 +117,84 @@ std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp) {
     return tab;
   }
 
-  R_xlen_t n = XLENGTH(lst);
-  std::vector<std::shared_ptr<arrow::Column>> columns(n);
+  int num_fields;
+  STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));
+
+  std::vector<std::shared_ptr<arrow::Column>> columns(num_fields);
   std::shared_ptr<arrow::Schema> schema;
 
   if (Rf_isNull(schema_sxp)) {
     // infer the schema from the ...
-    std::vector<std::shared_ptr<arrow::Field>> fields(n);
-    Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
+    std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
+    SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
 
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP x = VECTOR_ELT(lst, i);
+    auto fill_one_column = [&columns, &fields](int j, SEXP x, SEXP name) {
       if (Rf_inherits(x, "arrow::Column")) {
-        columns[i] = arrow::r::extract<arrow::Column>(x);
-        fields[i] = columns[i]->field();
+        columns[j] = arrow::r::extract<arrow::Column>(x);
+        fields[j] = columns[j]->field();
       } else if (Rf_inherits(x, "arrow::ChunkedArray")) {
         auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
-        fields[i] =
-            std::make_shared<arrow::Field>(std::string(names[i]), chunked_array->type());
-        columns[i] = std::make_shared<arrow::Column>(fields[i], chunked_array);
+        fields[j] = std::make_shared<arrow::Field>(CHAR(name), chunked_array->type());
+        columns[j] = std::make_shared<arrow::Column>(fields[j], chunked_array);
       } else if (Rf_inherits(x, "arrow::Array")) {
         auto array = arrow::r::extract<arrow::Array>(x);
-        fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
-        columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+        fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
+        columns[j] = std::make_shared<arrow::Column>(fields[j], array);
       } else {
         auto array = Array__from_vector(x, R_NilValue);
-        fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
-        columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+        fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
+        columns[j] = std::make_shared<arrow::Column>(fields[j], array);
+      }
+    };
+
+    for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+      SEXP name_i = STRING_ELT(names, i);
+      SEXP x_i = VECTOR_ELT(lst, i);
+
+      if (LENGTH(name_i) == 0) {
+        SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
+        for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+          fill_one_column(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
+        }
+      } else {
+        fill_one_column(j, x_i, name_i);
+        j++;
       }
     }
+
     schema = std::make_shared<arrow::Schema>(std::move(fields));
   } else {
     // use the schema that is given
     schema = arrow::r::extract<arrow::Schema>(schema_sxp);
 
-    for (R_xlen_t i = 0; i < n; i++) {
-      SEXP x = VECTOR_ELT(lst, i);
+    auto fill_one_column = [&columns, &schema](int j, SEXP x) {
       if (Rf_inherits(x, "arrow::Column")) {
-        columns[i] = arrow::r::extract<arrow::Column>(x);
+        columns[j] = arrow::r::extract<arrow::Column>(x);
       } else if (Rf_inherits(x, "arrow::ChunkedArray")) {
         auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
-        columns[i] = std::make_shared<arrow::Column>(schema->field(i), chunked_array);
+        columns[j] = std::make_shared<arrow::Column>(schema->field(j), chunked_array);
       } else if (Rf_inherits(x, "arrow::Array")) {
         auto array = arrow::r::extract<arrow::Array>(x);
-        columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+        columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
       } else {
-        auto type = schema->field(i)->type();
+        auto type = schema->field(j)->type();
         auto array = arrow::r::Array__from_vector(x, type, false);
-        columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+        columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
+      }
+    };
+
+    SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+    for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
+      SEXP name_i = STRING_ELT(names, i);
+      SEXP x_i = VECTOR_ELT(lst, i);
+
+      if (LENGTH(name_i) == 0) {
+        for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
+          fill_one_column(j, VECTOR_ELT(x_i, k));
+        }
+      } else {
+        fill_one_column(j, x_i);
+        j++;
       }
     }
   }
diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R
index c77e1f1..d5a141c 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -153,10 +153,24 @@ test_that("record_batch() handles arrow::Array", {
 
 test_that("record_batch() handles data frame columns", {
   tib <- tibble::tibble(x = 1:10, y = 1:10)
+  # because tib is named here, this becomes a struct array
   batch <- record_batch(a = 1:10, b = tib)
-  expect_equal(batch$schema, schema(a = int32(), struct(x = int32(), y = int32())))
+  expect_equal(batch$schema,
+    schema(
+      a = int32(),
+      struct(x = int32(), y = int32())
+    )
+  )
   out <- as.data.frame(batch)
   expect_equivalent(out, tibble::tibble(a = 1:10, b = tib))
+
+  # if not named, columns from tib are auto spliced
+  batch2 <- record_batch(a = 1:10, tib)
+  expect_equal(batch$schema,
+    schema(a = int32(), x = int32(), y = int32())
+  )
+  out <- as.data.frame(batch2)
+  expect_equivalent(out, tibble::tibble(a = 1:10, !!!tib))
 })
 
 test_that("record_batch() handles data frame columns with schema spec", {
@@ -170,3 +184,40 @@ test_that("record_batch() handles data frame columns with schema spec", {
   schema <- schema(a = int32(), b = struct(x = int16(), y = utf8()))
   expect_error(record_batch(a = 1:10, b = tib, schema = schema))
 })
+
+test_that("record_batch() auto splices (ARROW-5718)", {
+  df <- tibble::tibble(x = 1:10, y = letters[1:10])
+  batch1 <- record_batch(df)
+  batch2 <- record_batch(!!!df)
+  expect_equal(batch1, batch2)
+  expect_equal(batch1$schema, schema(x = int32(), y = utf8()))
+  expect_equivalent(as.data.frame(batch1), df)
+
+  batch3 <- record_batch(df, z = 1:10)
+  batch4 <- record_batch(!!!df, z = 1:10)
+  expect_equal(batch3, batch4)
+  expect_equal(batch3$schema, schema(x = int32(), y = utf8(), z = int32()))
+  expect_equivalent(as.data.frame(batch3), cbind(df, data.frame(z = 1:10)))
+
+  s <- schema(x = float64(), y = utf8())
+  batch5 <- record_batch(df, schema = s)
+  batch6 <- record_batch(!!!df, schema = s)
+  expect_equal(batch5, batch6)
+  expect_equal(batch5$schema, s)
+  expect_equivalent(as.data.frame(batch5), df)
+
+  s2 <- schema(x = float64(), y = utf8(), z = int16())
+  batch7 <- record_batch(df, z = 1:10, schema = s2)
+  batch8 <- record_batch(!!!df, z = 1:10, schema = s2)
+  expect_equal(batch7, batch8)
+  expect_equal(batch7$schema, s2)
+  expect_equivalent(as.data.frame(batch7), cbind(df, data.frame(z = 1:10)))
+})
+
+test_that("record_batch() only auto splice data frames", {
+  expect_error(
+    record_batch(1:10),
+    regexp = "only data frames are allowed as unnamed arguments to be auto spliced"
+  )
+})
+
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 56b0969..ae948f2 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -119,3 +119,21 @@ test_that("table() handles ... of arrays, chunked arrays, vectors", {
     tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10])
   )
 })
+
+test_that("table() auto splices (ARROW-5718)", {
+  df <- tibble::tibble(x = 1:10, y = letters[1:10])
+
+  tab1 <- table(df)
+  tab2 <- table(!!!df)
+  expect_equal(tab1, tab2)
+  expect_equal(tab1$schema, schema(x = int32(), y = utf8()))
+  expect_equivalent(as.data.frame(tab1), df)
+
+  s <- schema(x = float64(), y = utf8())
+  tab3 <- table(df, schema = s)
+  tab4 <- table(!!!df, schema = s)
+  expect_equal(tab3, tab4)
+  expect_equal(tab3$schema, s)
+  expect_equivalent(as.data.frame(tab3), df)
+})
+