You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/03 16:30:50 UTC
[arrow] branch master updated: ARROW-3814: [R]
RecordBatch$from_arrays()
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 894b6e7 ARROW-3814: [R] RecordBatch$from_arrays()
894b6e7 is described below
commit 894b6e70d6e93d7c1f2af3abc4b3302cfa1f57c8
Author: Romain Francois <ro...@purrple.cat>
AuthorDate: Mon Jun 3 11:30:41 2019 -0500
ARROW-3814: [R] RecordBatch$from_arrays()
This started out as an implementation of `RecordBatch$from_arrays()` (i.e. https://issues.apache.org/jira/browse/ARROW-3814?filter=12344983) but now looks more like this issue: https://issues.apache.org/jira/browse/ARROW-3815?filter=12344983
The idea being that the record batch factory `record_batch()` would work with `...` and `schema`, where each thing in the `...` could be:
- an `arrow::Array`
- an R vector that can be converted to an array using `array()`
So where we had this before:
```r
record_batch(tibble::tibble(x = 1:10, y = 1:10))
```
we would now have:
```r
record_batch(x = 1:10, y = 1:10)
```
We would still be able to start from a data frame, via splicing, e.g.:
```
tbl <- tibble::tibble(x = 1:10, y = 1:10)
record_batch(!!!tbl)
```
So there would be no need for a `RecordBatch$fromArray()` method.
Author: Romain Francois <ro...@purrple.cat>
Author: Wes McKinney <we...@apache.org>
Closes #3565 from romainfrancois/ARROW-3814/record_batch_from_arrays and squashes the following commits:
ab0cd1626 <Romain Francois> only pass $1 to run_clang_format so that we can do:
6ea077883 <Romain Francois> rebase
f27dcb9fe <Wes McKinney> Also run cpplint and clang-format on .cpp files
2362ea0e7 <Romain Francois> typo
eed535f6a <Romain Francois> add comments about !!!
efd84c506 <Romain Francois> record_batch(schema=) compares names
457494266 <Romain Francois> STOP_IF migth be useful too
c04f904c9 <Romain Francois> tests about record_batch(schema=) argument
fc885fda7 <Romain Francois> directly return from builder_->Finish(), as suggested here: https://github.com/apache/arrow/pull/3635/files/08b295370271f122b410b991282b4919510b5cea#r261012517
d958108c9 <Romain Francois> record_batch(..., schema = )
d8e627fb4 <Romain Francois> use the schema= argument in table()
68744f5d2 <Romain Francois> tests for table(...<vectors, arrays, chunked arrays>)
7e8a4b7d8 <Romain Francois> test for table(...<batches>)
41b496fc0 <Romain Francois> table(...) cab now either handle ... being:
c00774a0f <Romain Francois> table() factory also handles ... and !!! a schema, similar to record_batch()
c5ad62643 <Romain Francois> retire RecordBatch__from_dataframe() function, no longer needed and replaced by RecordBatch__from_arrays()
20c5ce671 <Romain Francois> move the logic of `RecordBatch__from_arrays` internally.
c71d87282 <Romain Francois> update docs
d49906af4 <Romain Francois> Change record_batch() api so that it takes ... and schema.
cd03e19ec <Romain Francois> + list_to_shared_ptr_vector
9a0f996e9 <Romain Francois> schema() supports tidy dots splicing, using rlang::list2
---
r/R/RcppExports.R | 16 ++---
r/R/RecordBatch.R | 9 ++-
r/R/Schema.R | 2 +-
r/R/Table.R | 11 +++-
r/R/feather.R | 7 ++-
r/R/write_arrow.R | 5 +-
r/lint.sh | 2 +-
r/man/record_batch.Rd | 6 +-
r/man/table.Rd | 6 +-
r/src/RcppExports.cpp | 50 ++++++++--------
r/src/array_from_vector.cpp | 7 +--
r/src/arrow_types.h | 24 ++++++--
r/src/recordbatch.cpp | 99 +++++++++++++++++++++++++------
r/src/table.cpp | 90 +++++++++++++++++++++++++---
r/tests/testthat/test-RecordBatch.R | 27 +++++++--
r/tests/testthat/test-Table.R | 44 +++++++++++++-
r/tests/testthat/test-arrow-csv-.R | 2 +-
r/tests/testthat/test-message.R | 2 +-
r/tests/testthat/test-messagereader.R | 4 +-
r/tests/testthat/test-read-write.R | 4 +-
r/tests/testthat/test-read_record_batch.R | 11 ++--
r/tests/testthat/test-recordbatchreader.R | 8 +--
r/tests/testthat/test-schema.R | 2 +-
23 files changed, 333 insertions(+), 105 deletions(-)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 81e1c04..e096122 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -693,10 +693,6 @@ RecordBatch__column <- function(batch, i) {
.Call(`_arrow_RecordBatch__column`, batch, i)
}
-RecordBatch__from_dataframe <- function(tbl) {
- .Call(`_arrow_RecordBatch__from_dataframe`, tbl)
-}
-
RecordBatch__Equals <- function(self, other) {
.Call(`_arrow_RecordBatch__Equals`, self, other)
}
@@ -729,6 +725,10 @@ ipc___ReadRecordBatch__InputStream__Schema <- function(stream, schema) {
.Call(`_arrow_ipc___ReadRecordBatch__InputStream__Schema`, stream, schema)
}
+RecordBatch__from_arrays <- function(schema_sxp, lst) {
+ .Call(`_arrow_RecordBatch__from_arrays`, schema_sxp, lst)
+}
+
RecordBatchReader__schema <- function(reader) {
.Call(`_arrow_RecordBatchReader__schema`, reader)
}
@@ -793,10 +793,6 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema) {
.Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema)
}
-Table__from_dataframe <- function(tbl) {
- .Call(`_arrow_Table__from_dataframe`, tbl)
-}
-
Table__num_columns <- function(x) {
.Call(`_arrow_Table__num_columns`, x)
}
@@ -817,6 +813,10 @@ Table__columns <- function(table) {
.Call(`_arrow_Table__columns`, table)
}
+Table__from_dots <- function(lst, schema_sxp) {
+ .Call(`_arrow_Table__from_dots`, lst, schema_sxp)
+}
+
#' Get the capacity of the global thread pool
#'
#' @return the number of worker threads in the thread pool to which
diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R
index 22fda84..2b9148a 100644
--- a/r/R/RecordBatch.R
+++ b/r/R/RecordBatch.R
@@ -91,10 +91,13 @@
#' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame
#'
-#' @param .data a data frame
+#' @param ... A variable number of arrow::Array
+#' @param schema a arrow::Schema
#'
#' @return a [arrow::RecordBatch][arrow__RecordBatch]
#' @export
-record_batch <- function(.data){
- shared_ptr(`arrow::RecordBatch`, RecordBatch__from_dataframe(.data))
+record_batch <- function(..., schema = NULL){
+ arrays <- tibble::lst(...)
+ stopifnot(length(arrays) > 0)
+ shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays))
}
diff --git a/r/R/Schema.R b/r/R/Schema.R
index d57e274..fbf6581 100644
--- a/r/R/Schema.R
+++ b/r/R/Schema.R
@@ -61,7 +61,7 @@
#'
#' @export
schema <- function(...){
- shared_ptr(`arrow::Schema`, schema_(.fields(list(...))))
+ shared_ptr(`arrow::Schema`, schema_(.fields(list2(...))))
}
#' read a Schema from a stream
diff --git a/r/R/Table.R b/r/R/Table.R
index 54731ca..87e87ac 100644
--- a/r/R/Table.R
+++ b/r/R/Table.R
@@ -53,11 +53,16 @@
#' Create an arrow::Table from a data frame
#'
-#' @param .data a data frame
+#' @param ... arrays, chunked arrays, or R vectors
+#' @param schema a schema. The default (`NULL`) infers the schema from the `...`
+#'
+#' @return an arrow::Table
#'
#' @export
-table <- function(.data){
- shared_ptr(`arrow::Table`, Table__from_dataframe(.data))
+table <- function(..., schema = NULL){
+ dots <- tibble::lst(...)
+ stopifnot(length(dots) > 0)
+ shared_ptr(`arrow::Table`, Table__from_dots(dots, schema))
}
#' @export
diff --git a/r/R/feather.R b/r/R/feather.R
index 6e4b3a6..4a1d9de 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -72,7 +72,12 @@ write_feather.default <- function(data, stream) {
#' @export
write_feather.data.frame <- function(data, stream) {
- write_feather(record_batch(data), stream)
+ # splice the columns in the record_batch() call
+ # e.g. if we had data <- data.frame(x = <...>, y = <...>)
+ # then record_batch(!!!data) is the same as
+ # record_batch(x = data$x, y = data$y)
+ # see ?rlang::list2()
+ write_feather(record_batch(!!!data), stream)
}
#' @method write_feather arrow::RecordBatch
diff --git a/r/R/write_arrow.R b/r/R/write_arrow.R
index b979569..486f361 100644
--- a/r/R/write_arrow.R
+++ b/r/R/write_arrow.R
@@ -21,7 +21,10 @@ to_arrow <- function(x) {
`to_arrow.arrow::RecordBatch` <- function(x) x
`to_arrow.arrow::Table` <- function(x) x
-`to_arrow.data.frame` <- function(x) table(x)
+
+# splice the data frame as arguments of table()
+# see ?rlang::list2()
+`to_arrow.data.frame` <- function(x) table(!!!x)
#' serialize an [arrow::Table][arrow__Table], an [arrow::RecordBatch][arrow__RecordBatch], or a
#' data frame to either the streaming format or the binary file format
diff --git a/r/lint.sh b/r/lint.sh
index e0a8941..fed64c1 100755
--- a/r/lint.sh
+++ b/r/lint.sh
@@ -33,4 +33,4 @@ CPPLINT=$CPP_BUILD_SUPPORT/cpplint.py
$CPP_BUILD_SUPPORT/run_cpplint.py \
--cpplint_binary=$CPPLINT \
--exclude_glob=$CPP_BUILD_SUPPORT/lint_exclusions.txt \
- --source_dir=$SOURCE_DIR/src --quiet $1
+ --source_dir=$SOURCE_DIR/src --quiet
diff --git a/r/man/record_batch.Rd b/r/man/record_batch.Rd
index 4567a9a..a9680bf 100644
--- a/r/man/record_batch.Rd
+++ b/r/man/record_batch.Rd
@@ -4,10 +4,12 @@
\alias{record_batch}
\title{Create an \link[=arrow__RecordBatch]{arrow::RecordBatch} from a data frame}
\usage{
-record_batch(.data)
+record_batch(..., schema = NULL)
}
\arguments{
-\item{.data}{a data frame}
+\item{...}{A variable number of arrow::Array}
+
+\item{schema}{a arrow::Schema}
}
\value{
a \link[=arrow__RecordBatch]{arrow::RecordBatch}
diff --git a/r/man/table.Rd b/r/man/table.Rd
index 743ee06..4d93ff3 100644
--- a/r/man/table.Rd
+++ b/r/man/table.Rd
@@ -4,10 +4,12 @@
\alias{table}
\title{Create an arrow::Table from a data frame}
\usage{
-table(.data)
+table(..., schema = NULL)
}
\arguments{
-\item{.data}{a data frame}
+\item{...}{arrays, chunked arrays, or R vectors}
+
+\item{schema}{NULL or a schema}
}
\description{
Create an arrow::Table from a data frame
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index 1ac96d4..a92c4c8 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -1940,17 +1940,6 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
-// RecordBatch__from_dataframe
-std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame tbl);
-RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< Rcpp::DataFrame >::type tbl(tblSEXP);
- rcpp_result_gen = Rcpp::wrap(RecordBatch__from_dataframe(tbl));
- return rcpp_result_gen;
-END_RCPP
-}
// RecordBatch__Equals
bool RecordBatch__Equals(const std::shared_ptr<arrow::RecordBatch>& self, const std::shared_ptr<arrow::RecordBatch>& other);
RcppExport SEXP _arrow_RecordBatch__Equals(SEXP selfSEXP, SEXP otherSEXP) {
@@ -2046,6 +2035,18 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// RecordBatch__from_arrays
+std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst);
+RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP);
+ Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP);
+ rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema_sxp, lst));
+ return rcpp_result_gen;
+END_RCPP
+}
// RecordBatchReader__schema
std::shared_ptr<arrow::Schema> RecordBatchReader__schema(const std::shared_ptr<arrow::RecordBatchReader>& reader);
RcppExport SEXP _arrow_RecordBatchReader__schema(SEXP readerSEXP) {
@@ -2224,17 +2225,6 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
-// Table__from_dataframe
-std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl);
-RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) {
-BEGIN_RCPP
- Rcpp::RObject rcpp_result_gen;
- Rcpp::RNGScope rcpp_rngScope_gen;
- Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP);
- rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl));
- return rcpp_result_gen;
-END_RCPP
-}
// Table__num_columns
int Table__num_columns(const std::shared_ptr<arrow::Table>& x);
RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) {
@@ -2291,6 +2281,18 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// Table__from_dots
+std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp);
+RcppExport SEXP _arrow_Table__from_dots(SEXP lstSEXP, SEXP schema_sxpSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP);
+ Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP);
+ rcpp_result_gen = Rcpp::wrap(Table__from_dots(lst, schema_sxp));
+ return rcpp_result_gen;
+END_RCPP
+}
// GetCpuThreadPoolCapacity
int GetCpuThreadPoolCapacity();
RcppExport SEXP _arrow_GetCpuThreadPoolCapacity() {
@@ -2486,7 +2488,6 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1},
{"_arrow_RecordBatch__columns", (DL_FUNC) &_arrow_RecordBatch__columns, 1},
{"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2},
- {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) &_arrow_RecordBatch__from_dataframe, 1},
{"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2},
{"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) &_arrow_RecordBatch__RemoveColumn, 2},
{"_arrow_RecordBatch__column_name", (DL_FUNC) &_arrow_RecordBatch__column_name, 2},
@@ -2495,6 +2496,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_RecordBatch__Slice2", (DL_FUNC) &_arrow_RecordBatch__Slice2, 3},
{"_arrow_ipc___SerializeRecordBatch__Raw", (DL_FUNC) &_arrow_ipc___SerializeRecordBatch__Raw, 1},
{"_arrow_ipc___ReadRecordBatch__InputStream__Schema", (DL_FUNC) &_arrow_ipc___ReadRecordBatch__InputStream__Schema, 2},
+ {"_arrow_RecordBatch__from_arrays", (DL_FUNC) &_arrow_RecordBatch__from_arrays, 2},
{"_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1},
{"_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1},
{"_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1},
@@ -2511,12 +2513,12 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1},
{"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2},
{"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2},
- {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1},
{"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1},
{"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1},
{"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1},
{"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2},
{"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1},
+ {"_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2},
{"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0},
{"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1},
{NULL, NULL, 0}
diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp
index 509a39a..e2b82ac 100644
--- a/r/src/array_from_vector.cpp
+++ b/r/src/array_from_vector.cpp
@@ -270,8 +270,7 @@ class VectorConverter {
virtual Status Ingest(SEXP obj) = 0;
virtual Status GetResult(std::shared_ptr<arrow::Array>* result) {
- RETURN_NOT_OK(builder_->Finish(result));
- return Status::OK();
+ return builder_->Finish(result);
}
ArrayBuilder* builder() const { return builder_; }
@@ -299,7 +298,7 @@ struct Unbox<Type, enable_if_integer<Type>> {
return IngestRange<int64_t>(builder, reinterpret_cast<int64_t*>(REAL(obj)),
XLENGTH(obj), NA_INT64);
}
- // TODO: handle aw and logical
+ // TODO: handle raw and logical
default:
break;
}
@@ -881,7 +880,7 @@ std::shared_ptr<Array> MakeSimpleArray(SEXP x) {
}
auto data = ArrayData::Make(std::make_shared<Type>(), LENGTH(x), std::move(buffers),
- null_count, 0);
+ null_count, 0 /*offset*/);
// return the right Array class
return std::make_shared<typename TypeTraits<Type>::ArrayType>(data);
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index 32f54cb..f4dad1b 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -19,6 +19,7 @@
#include <limits>
#include <memory>
+#include <vector>
#include <RcppCommon.h>
@@ -35,11 +36,12 @@
#include <arrow/type.h>
#include <arrow/util/compression.h>
-#define STOP_IF_NOT(TEST, MSG) \
- do { \
- if (!(TEST)) Rcpp::stop(MSG); \
+#define STOP_IF(TEST, MSG) \
+ do { \
+ if (TEST) Rcpp::stop(MSG); \
} while (0)
+#define STOP_IF_NOT(TEST, MSG) STOP_IF(!(TEST), MSG)
#define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString())
template <typename T>
@@ -178,8 +180,7 @@ inline constexpr Rbyte default_value<RAWSXP>() {
SEXP ChunkedArray__as_vector(const std::shared_ptr<arrow::ChunkedArray>& chunked_array);
SEXP Array__as_vector(const std::shared_ptr<arrow::Array>& array);
std::shared_ptr<arrow::Array> Array__from_vector(SEXP x, SEXP type);
-std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame tbl);
-std::shared_ptr<arrow::DataType> Array__infer_type(SEXP x);
+std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP, SEXP);
namespace arrow {
namespace r {
@@ -207,5 +208,18 @@ inline std::shared_ptr<T> extract(SEXP x) {
return Rcpp::ConstReferenceSmartPtrInputParameter<std::shared_ptr<T>>(x);
}
+template <typename T>
+std::vector<std::shared_ptr<T>> list_to_shared_ptr_vector(SEXP lst) {
+ R_xlen_t n = XLENGTH(lst);
+ std::vector<std::shared_ptr<T>> res(n);
+ for (R_xlen_t i = 0; i < n; i++) {
+ res[i] = extract<T>(VECTOR_ELT(lst, i));
+ }
+ return res;
+}
+
+std::shared_ptr<arrow::Array> Array__from_vector(
+ SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_infered);
+
} // namespace r
} // namespace arrow
diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp
index 31fefa8..3901438 100644
--- a/r/src/recordbatch.cpp
+++ b/r/src/recordbatch.cpp
@@ -55,24 +55,6 @@ std::shared_ptr<arrow::Array> RecordBatch__column(
}
// [[Rcpp::export]]
-std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame tbl) {
- Rcpp::CharacterVector names = tbl.names();
-
- std::vector<std::shared_ptr<arrow::Field>> fields;
- std::vector<std::shared_ptr<arrow::Array>> arrays;
-
- for (int i = 0; i < tbl.size(); i++) {
- SEXP x = tbl[i];
- arrays.push_back(Array__from_vector(x, R_NilValue));
- fields.push_back(
- std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type()));
- }
- auto schema = std::make_shared<arrow::Schema>(std::move(fields));
-
- return arrow::RecordBatch::Make(schema, tbl.nrow(), std::move(arrays));
-}
-
-// [[Rcpp::export]]
bool RecordBatch__Equals(const std::shared_ptr<arrow::RecordBatch>& self,
const std::shared_ptr<arrow::RecordBatch>& other) {
return self->Equals(*other);
@@ -145,3 +127,84 @@ std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__InputStream__Schema(
STOP_IF_NOT_OK(arrow::ipc::ReadRecordBatch(schema, &memo, stream.get(), &batch));
return batch;
}
+
+arrow::Status check_consistent_array_size(
+ const std::vector<std::shared_ptr<arrow::Array>>& arrays, int64_t* num_rows) {
+ *num_rows = arrays[0]->length();
+ for (int64_t i = 1; i < arrays.size(); i++) {
+ if (arrays[i]->length() != *num_rows) {
+ return arrow::Status::Invalid("All arrays must have the same length");
+ }
+ }
+ return arrow::Status::OK();
+}
+
+std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays__known_schema(
+ const std::shared_ptr<arrow::Schema>& schema, SEXP lst) {
+ R_xlen_t n_arrays = XLENGTH(lst);
+ if (schema->num_fields() != n_arrays) {
+ Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied",
+ schema->num_fields(), n_arrays);
+ }
+
+ // convert lst to a vector of arrow::Array
+ std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
+ SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
+ bool has_names = !Rf_isNull(names);
+
+ for (R_xlen_t i = 0; i < n_arrays; i++) {
+ if (has_names && schema->field(i)->name() != CHAR(STRING_ELT(names, i))) {
+ Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1,
+ schema->field(i)->name(), CHAR(STRING_ELT(names, i)));
+ }
+ arrays[i] =
+ arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false);
+ }
+
+ int64_t num_rows;
+ STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+ return arrow::RecordBatch::Make(schema, num_rows, arrays);
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) {
+ if (Rf_inherits(schema_sxp, "arrow::Schema")) {
+ return RecordBatch__from_arrays__known_schema(
+ arrow::r::extract<arrow::Schema>(schema_sxp), lst);
+ }
+
+ R_xlen_t n_arrays = XLENGTH(lst);
+
+ // convert lst to a vector of arrow::Array
+ std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
+ for (R_xlen_t i = 0; i < n_arrays; i++) {
+ arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue);
+ }
+
+ // generate schema from the types that have been infered
+ std::shared_ptr<arrow::Schema> schema;
+ if (Rf_inherits(schema_sxp, "arrow::Schema")) {
+ schema = arrow::r::extract<arrow::Schema>(schema_sxp);
+ } else {
+ Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
+ std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
+ for (R_xlen_t i = 0; i < n_arrays; i++) {
+ fields[i] =
+ std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
+ }
+ schema = std::make_shared<arrow::Schema>(std::move(fields));
+ }
+
+ Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
+ std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
+ for (R_xlen_t i = 0; i < n_arrays; i++) {
+ fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
+ }
+ schema = std::make_shared<arrow::Schema>(std::move(fields));
+
+ // check all sizes are the same
+ int64_t num_rows;
+ STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
+
+ return arrow::RecordBatch::Make(schema, num_rows, arrays);
+}
diff --git a/r/src/table.cpp b/r/src/table.cpp
index c04e1d3..f78b2af 100644
--- a/r/src/table.cpp
+++ b/r/src/table.cpp
@@ -23,15 +23,6 @@
using Rcpp::DataFrame;
// [[Rcpp::export]]
-std::shared_ptr<arrow::Table> Table__from_dataframe(DataFrame tbl) {
- auto rb = RecordBatch__from_dataframe(tbl);
-
- std::shared_ptr<arrow::Table> out;
- STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out));
- return out;
-}
-
-// [[Rcpp::export]]
int Table__num_columns(const std::shared_ptr<arrow::Table>& x) {
return x->num_columns();
}
@@ -60,3 +51,84 @@ std::vector<std::shared_ptr<arrow::Column>> Table__columns(
}
return res;
}
+
+bool all_record_batches(SEXP lst) {
+ R_xlen_t n = XLENGTH(lst);
+ for (R_xlen_t i = 0; i < n; i++) {
+ if (!Rf_inherits(VECTOR_ELT(lst, i), "arrow::RecordBatch")) return false;
+ }
+ return true;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp) {
+ // lst can be either:
+ // - a list of record batches, in which case we call Table::FromRecordBatches
+
+ if (all_record_batches(lst)) {
+ auto batches = arrow::r::list_to_shared_ptr_vector<arrow::RecordBatch>(lst);
+ std::shared_ptr<arrow::Table> tab;
+
+ if (Rf_inherits(schema_sxp, "arrow::Schema")) {
+ auto schema = arrow::r::extract<arrow::Schema>(schema_sxp);
+ STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &tab));
+ } else {
+ STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &tab));
+ }
+ return tab;
+ }
+
+ R_xlen_t n = XLENGTH(lst);
+ std::vector<std::shared_ptr<arrow::Column>> columns(n);
+ std::shared_ptr<arrow::Schema> schema;
+
+ if (Rf_isNull(schema_sxp)) {
+ // infer the schema from the ...
+ std::vector<std::shared_ptr<arrow::Field>> fields(n);
+ Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
+
+ for (R_xlen_t i = 0; i < n; i++) {
+ SEXP x = VECTOR_ELT(lst, i);
+ if (Rf_inherits(x, "arrow::Column")) {
+ columns[i] = arrow::r::extract<arrow::Column>(x);
+ fields[i] = columns[i]->field();
+ } else if (Rf_inherits(x, "arrow::ChunkedArray")) {
+ auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
+ fields[i] =
+ std::make_shared<arrow::Field>(std::string(names[i]), chunked_array->type());
+ columns[i] = std::make_shared<arrow::Column>(fields[i], chunked_array);
+ } else if (Rf_inherits(x, "arrow::Array")) {
+ auto array = arrow::r::extract<arrow::Array>(x);
+ fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
+ columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+ } else {
+ auto array = Array__from_vector(x, R_NilValue);
+ fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
+ columns[i] = std::make_shared<arrow::Column>(fields[i], array);
+ }
+ }
+ schema = std::make_shared<arrow::Schema>(std::move(fields));
+ } else {
+ // use the schema that is given
+ schema = arrow::r::extract<arrow::Schema>(schema_sxp);
+
+ for (R_xlen_t i = 0; i < n; i++) {
+ SEXP x = VECTOR_ELT(lst, i);
+ if (Rf_inherits(x, "arrow::Column")) {
+ columns[i] = arrow::r::extract<arrow::Column>(x);
+ } else if (Rf_inherits(x, "arrow::ChunkedArray")) {
+ auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
+ columns[i] = std::make_shared<arrow::Column>(schema->field(i), chunked_array);
+ } else if (Rf_inherits(x, "arrow::Array")) {
+ auto array = arrow::r::extract<arrow::Array>(x);
+ columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+ } else {
+ auto type = schema->field(i)->type();
+ auto array = arrow::r::Array__from_vector(x, type, false);
+ columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
+ }
+ }
+ }
+
+ return arrow::Table::Make(schema, columns);
+}
diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R
index da09984..c0295ba 100644
--- a/r/tests/testthat/test-RecordBatch.R
+++ b/r/tests/testthat/test-RecordBatch.R
@@ -24,7 +24,7 @@ test_that("RecordBatch", {
chr = letters[1:10],
fct = factor(letters[1:10])
)
- batch <- record_batch(tbl)
+ batch <- record_batch(!!!tbl)
expect_true(batch == batch)
expect_equal(
@@ -93,7 +93,7 @@ test_that("RecordBatch with 0 rows are supported", {
fct = factor(character(), levels = c("a", "b"))
)
- batch <- record_batch(tbl)
+ batch <- record_batch(!!!tbl)
expect_equal(batch$num_columns, 5L)
expect_equal(batch$num_rows, 0L)
expect_equal(
@@ -109,7 +109,7 @@ test_that("RecordBatch with 0 rows are supported", {
})
test_that("RecordBatch cast (ARROW-3741)", {
- batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10))
+ batch <- record_batch(x = 1:10, y = 1:10)
expect_error(batch$cast(schema(x = int32())))
expect_error(batch$cast(schema(x = int32(), z = int32())))
@@ -121,8 +121,27 @@ test_that("RecordBatch cast (ARROW-3741)", {
expect_equal(batch2$column(1L)$type, int64())
})
+test_that("record_batch() handles schema= argument", {
+ s <- schema(x = int32(), y = int32())
+ batch <- record_batch(x = 1:10, y = 1:10, schema = s)
+ expect_equal(s, batch$schema)
+
+ s <- schema(x = int32(), y = float64())
+ batch <- record_batch(x = 1:10, y = 1:10, schema = s)
+ expect_equal(s, batch$schema)
+
+ s <- schema(x = int32(), y = utf8())
+ expect_error(record_batch(x = 1:10, y = 1:10, schema = s))
+})
+
+test_that("record_batch(schema=) does some basic consistency checking of the schema", {
+ s <- schema(x = int32())
+ expect_error(record_batch(x = 1:10, y = 1:10, schema = s))
+ expect_error(record_batch(z = 1:10, schema = s))
+})
+
test_that("RecordBatch dim() and nrow() (ARROW-3816)", {
- batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10))
+ batch <- record_batch(x = 1:10, y = 1:10)
expect_equal(dim(batch), c(10L, 2L))
expect_equal(nrow(batch), 10L)
})
diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R
index 346ce4a..59234fa 100644
--- a/r/tests/testthat/test-Table.R
+++ b/r/tests/testthat/test-Table.R
@@ -23,7 +23,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", {
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
)
- tab <- arrow::table(tbl)
+ tab <- arrow::table(!!!tbl)
tf <- tempfile()
write_arrow(tab, tf)
@@ -64,7 +64,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", {
})
test_that("Table cast (ARROW-3741)", {
- tab <- table(tibble::tibble(x = 1:10, y = 1:10))
+ tab <- table(x = 1:10, y = 1:10)
expect_error(tab$cast(schema(x = int32())))
expect_error(tab$cast(schema(x = int32(), z = int32())))
@@ -77,7 +77,45 @@ test_that("Table cast (ARROW-3741)", {
})
test_that("Table dim() and nrow() (ARROW-3816)", {
- tab <- table(tibble::tibble(x = 1:10, y = 1:10))
+ tab <- table(x = 1:10, y = 1:10)
expect_equal(dim(tab), c(10L, 2L))
expect_equal(nrow(tab), 10L)
})
+
+test_that("table() handles record batches with splicing", {
+ batch <- record_batch(x = 1:2, y = letters[1:2])
+ tab <- table(batch, batch, batch)
+ expect_equal(tab$schema, batch$schema)
+ expect_equal(tab$num_rows, 6L)
+ expect_equal(
+ as_tibble(tab),
+ vctrs::vec_rbind(as_tibble(batch), as_tibble(batch), as_tibble(batch))
+ )
+
+ batches <- list(batch, batch, batch)
+ tab <- table(!!!batches)
+ expect_equal(tab$schema, batch$schema)
+ expect_equal(tab$num_rows, 6L)
+ expect_equal(
+ as_tibble(tab),
+ vctrs::vec_rbind(!!!purrr::map(batches, as_tibble))
+ )
+})
+
+test_that("table() handles ... of arrays, chunked arrays, vectors", {
+ a <- array(1:10)
+ ca <- chunked_array(1:5, 6:10)
+ v <- rnorm(10)
+ tbl <- tibble::tibble(x = 1:10, y = letters[1:10])
+
+ tab <- table(a = a, b = ca, c = v, !!!tbl)
+ expect_equal(
+ tab$schema,
+ schema(a = int32(), b = int32(), c = float64(), x = int32(), y = utf8())
+ )
+ res <- as_tibble(tab)
+ expect_equal(names(res), c("a", "b", "c", "x", "y"))
+ expect_equal(res,
+ tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10])
+ )
+})
diff --git a/r/tests/testthat/test-arrow-csv-.R b/r/tests/testthat/test-arrow-csv-.R
index 27b4d53..ee09b60 100644
--- a/r/tests/testthat/test-arrow-csv-.R
+++ b/r/tests/testthat/test-arrow-csv-.R
@@ -27,7 +27,7 @@ test_that("Can read csv file", {
tab3 <- read_csv_arrow(ReadableFile(tf))
iris$Species <- as.character(iris$Species)
- tab0 <- table(iris)
+ tab0 <- table(!!!iris)
expect_equal(tab0, tab1)
expect_equal(tab0, tab2)
expect_equal(tab0, tab3)
diff --git a/r/tests/testthat/test-message.R b/r/tests/testthat/test-message.R
index 3fe5829..4cbf87d 100644
--- a/r/tests/testthat/test-message.R
+++ b/r/tests/testthat/test-message.R
@@ -18,7 +18,7 @@
context("arrow::ipc::Message")
test_that("read_message can read from input stream", {
- batch <- record_batch(tibble::tibble(x = 1:10))
+ batch <- record_batch(x = 1:10)
bytes <- batch$serialize()
stream <- BufferReader(bytes)
diff --git a/r/tests/testthat/test-messagereader.R b/r/tests/testthat/test-messagereader.R
index 5ff8277..690228d 100644
--- a/r/tests/testthat/test-messagereader.R
+++ b/r/tests/testthat/test-messagereader.R
@@ -18,7 +18,7 @@
context("arrow::ipc::MessageReader")
test_that("MessageReader can be created from raw vectors", {
- batch <- record_batch(tibble::tibble(x = 1:10))
+ batch <- record_batch(x = 1:10)
bytes <- batch$serialize()
reader <- MessageReader(bytes)
@@ -34,7 +34,7 @@ test_that("MessageReader can be created from raw vectors", {
})
test_that("MessageReader can be created from input stream", {
- batch <- record_batch(tibble::tibble(x = 1:10))
+ batch <- record_batch(x = 1:10)
bytes <- batch$serialize()
stream <- BufferReader(bytes)
diff --git a/r/tests/testthat/test-read-write.R b/r/tests/testthat/test-read-write.R
index 3fe07cd..c56a7d3 100644
--- a/r/tests/testthat/test-read-write.R
+++ b/r/tests/testthat/test-read-write.R
@@ -24,7 +24,7 @@ test_that("arrow::table round trip", {
raw = as.raw(1:10)
)
- tab <- arrow::table(tbl)
+ tab <- arrow::table(!!!tbl)
expect_equal(tab$num_columns, 3L)
expect_equal(tab$num_rows, 10L)
@@ -99,7 +99,7 @@ test_that("arrow::table round trip handles NA in integer and numeric", {
raw = as.raw(1:10)
)
- tab <- arrow::table(tbl)
+ tab <- arrow::table(!!!tbl)
expect_equal(tab$num_columns, 3L)
expect_equal(tab$num_rows, 10L)
diff --git a/r/tests/testthat/test-read_record_batch.R b/r/tests/testthat/test-read_record_batch.R
index 2618fae..adbb192 100644
--- a/r/tests/testthat/test-read_record_batch.R
+++ b/r/tests/testthat/test-read_record_batch.R
@@ -18,11 +18,12 @@
context("read_record_batch()")
test_that("RecordBatchFileWriter / RecordBatchFileReader roundtrips", {
- tab <- table(tibble::tibble(
- int = 1:10, dbl = as.numeric(1:10),
+ tab <- table(
+ int = 1:10,
+ dbl = as.numeric(1:10),
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
- ))
+ )
tf <- tempfile()
writer <- RecordBatchFileWriter(tf, tab$schema)
@@ -48,7 +49,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3
lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE),
chr = letters[1:10]
)
- batch <- record_batch(tbl)
+ batch <- record_batch(!!!tbl)
schema <- batch$schema
raw <- batch$serialize()
@@ -64,7 +65,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3
})
test_that("read_record_batch() can handle (Message, Schema) parameters (ARROW-3499)", {
- batch <- record_batch(tibble::tibble(x = 1:10))
+ batch <- record_batch(x = 1:10)
schema <- batch$schema
raw <- batch$serialize()
diff --git a/r/tests/testthat/test-recordbatchreader.R b/r/tests/testthat/test-recordbatchreader.R
index d2b6a09..65f7933 100644
--- a/r/tests/testthat/test-recordbatchreader.R
+++ b/r/tests/testthat/test-recordbatchreader.R
@@ -18,10 +18,10 @@
context("arrow::RecordBatch.*(Reader|Writer)")
test_that("RecordBatchStreamReader / Writer", {
- batch <- record_batch(tibble::tibble(
+ batch <- record_batch(
x = 1:10,
y = letters[1:10]
- ))
+ )
sink <- BufferOutputStream()
writer <- RecordBatchStreamWriter(sink, batch$schema)
@@ -43,10 +43,10 @@ test_that("RecordBatchStreamReader / Writer", {
})
test_that("RecordBatchFileReader / Writer", {
- batch <- record_batch(tibble::tibble(
+ batch <- record_batch(
x = 1:10,
y = letters[1:10]
- ))
+ )
sink <- BufferOutputStream()
writer <- RecordBatchFileWriter(sink, batch$schema)
diff --git a/r/tests/testthat/test-schema.R b/r/tests/testthat/test-schema.R
index 2f2d3ee..ff40b81 100644
--- a/r/tests/testthat/test-schema.R
+++ b/r/tests/testthat/test-schema.R
@@ -20,7 +20,7 @@ context("arrow::Schema")
test_that("reading schema from Buffer", {
# TODO: this uses the streaming format, i.e. from RecordBatchStreamWriter
# maybe there is an easier way to serialize a schema
- batch <- record_batch(tibble::tibble(x = 1:10))
+ batch <- record_batch(x = 1:10)
expect_is(batch, "arrow::RecordBatch")
stream <- BufferOutputStream()