You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ko...@apache.org on 2022/10/20 21:43:52 UTC

[arrow] 05/13: ARROW-18079: [R] Improve efficiency of schema creation to prevent performance regressions (#14447)

This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch maint-10.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 04ccd84bb88123a55134edb982cc695a5d5e0632
Author: Nic Crane <th...@gmail.com>
AuthorDate: Tue Oct 18 16:22:33 2022 +0100

    ARROW-18079: [R] Improve efficiency of schema creation to prevent performance regressions (#14447)
    
    Authored-by: Nic Crane <th...@gmail.com>
    Signed-off-by: Nic Crane <th...@gmail.com>
---
 r/NAMESPACE            |  1 +
 r/R/arrow-package.R    |  2 +-
 r/R/arrowExports.R     |  8 ++++++--
 r/R/dplyr-collect.R    |  6 ++++++
 r/R/dplyr-eval.R       |  3 ++-
 r/R/dplyr-select.R     |  3 ++-
 r/R/schema.R           |  6 +++---
 r/src/arrowExports.cpp | 17 +++++++++++++----
 r/src/schema.cpp       | 19 ++++++++++++++++++-
 9 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/r/NAMESPACE b/r/NAMESPACE
index f1f4bd8057..4a0c6ed261 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -412,6 +412,7 @@ importFrom(purrr,map_dfr)
 importFrom(purrr,map_int)
 importFrom(purrr,map_lgl)
 importFrom(purrr,reduce)
+importFrom(purrr,walk)
 importFrom(rlang,"%||%")
 importFrom(rlang,":=")
 importFrom(rlang,.data)
diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R
index 4c5067480a..1ab4e41a7a 100644
--- a/r/R/arrow-package.R
+++ b/r/R/arrow-package.R
@@ -18,7 +18,7 @@
 #' @importFrom stats quantile median na.omit na.exclude na.pass na.fail
 #' @importFrom R6 R6Class
 #' @importFrom purrr as_mapper map map2 map_chr map2_chr map_dbl map_dfr map_int map_lgl keep imap imap_chr
-#' @importFrom purrr flatten reduce
+#' @importFrom purrr flatten reduce walk
 #' @importFrom assertthat assert_that is.string
 #' @importFrom rlang list2 %||% is_false abort dots_n warn enquo quo_is_null enquos is_integerish quos quo
 #' @importFrom rlang eval_tidy new_data_mask syms env new_environment env_bind set_names exec
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a45bb7ae57..c42fca00b5 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -1876,8 +1876,12 @@ Scalar__ApproxEquals <- function(lhs, rhs) {
   .Call(`_arrow_Scalar__ApproxEquals`, lhs, rhs)
 }
 
-schema_ <- function(fields) {
-  .Call(`_arrow_schema_`, fields)
+Schema__from_fields <- function(fields) {
+  .Call(`_arrow_Schema__from_fields`, fields)
+}
+
+Schema__from_list <- function(field_list) {
+  .Call(`_arrow_Schema__from_list`, field_list)
 }
 
 Schema__ToString <- function(s) {
diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R
index be58031d96..4f8ffc7c1a 100644
--- a/r/R/dplyr-collect.R
+++ b/r/R/dplyr-collect.R
@@ -115,6 +115,12 @@ implicit_schema <- function(.data) {
   # want to go one level up (where we may have called implicit_schema() before)
   .data <- ensure_group_vars(.data)
   old_schm <- .data$.data$schema
+
+  if (is.null(.data$aggregations) && is.null(.data$join) && !needs_projection(.data$selected_columns, old_schm)) {
+    # Just use the schema we have
+    return(old_schm)
+  }
+
   # Add in any augmented fields that may exist in the query but not in the
   # real data, in case we have FieldRefs to them
   old_schm[["__filename"]] <- string()
diff --git a/r/R/dplyr-eval.R b/r/R/dplyr-eval.R
index a8fb7c4330..15618d01d9 100644
--- a/r/R/dplyr-eval.R
+++ b/r/R/dplyr-eval.R
@@ -95,8 +95,9 @@ arrow_mask <- function(.data, aggregation = FALSE) {
     }
   }
 
+  schema <- .data$.data$schema
   # Assign the schema to the expressions
-  map(.data$selected_columns, ~ (.$schema <- .data$.data$schema))
+  walk(.data$selected_columns, ~ (.$schema <- schema))
 
   # Add the column references and make the mask
   out <- new_data_mask(
diff --git a/r/R/dplyr-select.R b/r/R/dplyr-select.R
index 3a9d82f975..9b6d07d375 100644
--- a/r/R/dplyr-select.R
+++ b/r/R/dplyr-select.R
@@ -45,7 +45,8 @@ relocate.arrow_dplyr_query <- function(.data, ..., .before = NULL, .after = NULL
   .data <- as_adq(.data)
 
   # Assign the schema to the expressions
-  map(.data$selected_columns, ~ (.$schema <- .data$.data$schema))
+  schema <- .data$.data$schema
+  walk(.data$selected_columns, ~ (.$schema <- schema))
 
   # Create a mask for evaluating expressions in tidyselect helpers
   mask <- new_environment(.cache$functions, parent = caller_env())
diff --git a/r/R/schema.R b/r/R/schema.R
index c7e26652c9..93e826eff2 100644
--- a/r/R/schema.R
+++ b/r/R/schema.R
@@ -182,9 +182,9 @@ Schema$create <- function(...) {
   }
 
   if (all(map_lgl(.list, ~ inherits(., "Field")))) {
-    schema_(.list)
+    Schema__from_fields(.list)
   } else {
-    schema_(.fields(.list))
+    Schema__from_list(imap(.list, as_type))
   }
 }
 #' @include arrowExports.R
@@ -298,7 +298,7 @@ length.Schema <- function(x) x$num_fields
       call. = FALSE
     )
   }
-  schema_(fields)
+  Schema__from_fields(fields)
 }
 
 #' @export
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 91c3c6a235..cde8795c9f 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -4776,11 +4776,19 @@ BEGIN_CPP11
 END_CPP11
 }
 // schema.cpp
-std::shared_ptr<arrow::Schema> schema_(const std::vector<std::shared_ptr<arrow::Field>>& fields);
-extern "C" SEXP _arrow_schema_(SEXP fields_sexp){
+std::shared_ptr<arrow::Schema> Schema__from_fields(const std::vector<std::shared_ptr<arrow::Field>>& fields);
+extern "C" SEXP _arrow_Schema__from_fields(SEXP fields_sexp){
 BEGIN_CPP11
 	arrow::r::Input<const std::vector<std::shared_ptr<arrow::Field>>&>::type fields(fields_sexp);
-	return cpp11::as_sexp(schema_(fields));
+	return cpp11::as_sexp(Schema__from_fields(fields));
+END_CPP11
+}
+// schema.cpp
+std::shared_ptr<arrow::Schema> Schema__from_list(cpp11::list field_list);
+extern "C" SEXP _arrow_Schema__from_list(SEXP field_list_sexp){
+BEGIN_CPP11
+	arrow::r::Input<cpp11::list>::type field_list(field_list_sexp);
+	return cpp11::as_sexp(Schema__from_list(field_list));
 END_CPP11
 }
 // schema.cpp
@@ -5695,7 +5703,8 @@ static const R_CallMethodDef CallEntries[] = {
 		{ "_arrow_Scalar__type", (DL_FUNC) &_arrow_Scalar__type, 1}, 
 		{ "_arrow_Scalar__Equals", (DL_FUNC) &_arrow_Scalar__Equals, 2}, 
 		{ "_arrow_Scalar__ApproxEquals", (DL_FUNC) &_arrow_Scalar__ApproxEquals, 2}, 
-		{ "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1}, 
+		{ "_arrow_Schema__from_fields", (DL_FUNC) &_arrow_Schema__from_fields, 1}, 
+		{ "_arrow_Schema__from_list", (DL_FUNC) &_arrow_Schema__from_list, 1}, 
 		{ "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1}, 
 		{ "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1}, 
 		{ "_arrow_Schema__field", (DL_FUNC) &_arrow_Schema__field, 2}, 
diff --git a/r/src/schema.cpp b/r/src/schema.cpp
index 2bc58f0fa3..0dac188ec0 100644
--- a/r/src/schema.cpp
+++ b/r/src/schema.cpp
@@ -22,11 +22,28 @@
 #include <arrow/util/key_value_metadata.h>
 
 // [[arrow::export]]
-std::shared_ptr<arrow::Schema> schema_(
+std::shared_ptr<arrow::Schema> Schema__from_fields(
     const std::vector<std::shared_ptr<arrow::Field>>& fields) {
   return arrow::schema(fields);
 }
 
+// [[arrow::export]]
+std::shared_ptr<arrow::Schema> Schema__from_list(cpp11::list field_list) {
+  int n = field_list.size();
+
+  bool nullable = true;
+  cpp11::strings names(field_list.attr(R_NamesSymbol));
+
+  std::vector<std::shared_ptr<arrow::Field>> fields(n);
+
+  for (int i = 0; i < n; i++) {
+    fields[i] = arrow::field(
+        names[i], cpp11::as_cpp<std::shared_ptr<arrow::DataType>>(field_list[i]),
+        nullable);
+  }
+  return arrow::schema(fields);
+}
+
 // [[arrow::export]]
 std::string Schema__ToString(const std::shared_ptr<arrow::Schema>& s) {
   return s->ToString();