You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/01/29 03:39:59 UTC
[arrow] branch master updated: ARROW-3761: [R] Bindings for
CompressedInputStream, CompressedOutputStream
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f576c3e ARROW-3761: [R] Bindings for CompressedInputStream, CompressedOutputStream
f576c3e is described below
commit f576c3ebe9159099eeef5c7229bb7041888109f6
Author: Romain Francois <ro...@purrple.cat>
AuthorDate: Mon Jan 28 21:39:54 2019 -0600
ARROW-3761: [R] Bindings for CompressedInputStream, CompressedOutputStream
Author: Romain Francois <ro...@purrple.cat>
Closes #2962 from romainfrancois/ARROW-3761/CompressedStream and squashes the following commits:
397baca5 <Romain Francois> rebasing
795cc71d <Romain Francois> rebase
45c692f8 <Romain Francois> adapt to changes from #3043
af3714b0 <Romain Francois> using size() until #3043 is merged
c696ff31 <Romain Francois> lint
08124a7e <Romain Francois> test reading and writing random bytes to Compressed streams
a3c5af4b <Romain Francois> align with changes from #3043
da10e94f <Romain Francois> + Compressed streams
---
r/DESCRIPTION | 3 +-
r/NAMESPACE | 11 +++++
r/R/RcppExports.R | 20 +++++++++
r/R/buffer.R | 5 +++
r/R/compression.R | 86 ++++++++++++++++++++++++++++++++++++++
r/R/enums.R | 6 +++
r/R/io.R | 9 +++-
r/man/CompressedInputStream.Rd | 16 +++++++
r/man/CompressedOutputStream.Rd | 16 +++++++
r/man/DataType.Rd | 3 ++
r/man/compression_codec.Rd | 14 +++++++
r/src/RcppExports.cpp | 62 +++++++++++++++++++++++++++
r/src/arrow_types.h | 3 ++
r/src/compression.cpp | 43 +++++++++++++++++++
r/src/io.cpp | 17 ++++++++
r/tests/testthat/test-compressed.R | 48 +++++++++++++++++++++
16 files changed, 359 insertions(+), 3 deletions(-)
diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index f5a904b..d994575 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -55,11 +55,12 @@ Collate:
'Table.R'
'array.R'
'buffer.R'
+ 'io.R'
+ 'compression.R'
'compute.R'
'csv.R'
'dictionary.R'
'feather.R'
- 'io.R'
'memory_pool.R'
'message.R'
'on_exit.R'
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 7fd76c7..4636c12 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -8,6 +8,12 @@ S3method("==","arrow::RecordBatch")
S3method("==","arrow::ipc::Message")
S3method(BufferReader,"arrow::Buffer")
S3method(BufferReader,default)
+S3method(CompressedInputStream,"arrow::io::InputStream")
+S3method(CompressedInputStream,character)
+S3method(CompressedInputStream,fs_path)
+S3method(CompressedOutputStream,"arrow::io::OutputStream")
+S3method(CompressedOutputStream,character)
+S3method(CompressedOutputStream,fs_path)
S3method(FeatherTableReader,"arrow::io::RandomAccessFile")
S3method(FeatherTableReader,"arrow::ipc::feather::TableReader")
S3method(FeatherTableReader,character)
@@ -34,6 +40,7 @@ S3method(RecordBatchStreamWriter,character)
S3method(RecordBatchStreamWriter,fs_path)
S3method(as_tibble,"arrow::RecordBatch")
S3method(as_tibble,"arrow::Table")
+S3method(buffer,"arrow::Buffer")
S3method(buffer,complex)
S3method(buffer,default)
S3method(buffer,integer)
@@ -74,6 +81,9 @@ S3method(write_feather_RecordBatch,default)
S3method(write_feather_RecordBatch,fs_path)
export(BufferOutputStream)
export(BufferReader)
+export(CompressedInputStream)
+export(CompressedOutputStream)
+export(CompressionType)
export(DateUnit)
export(FeatherTableReader)
export(FeatherTableWriter)
@@ -99,6 +109,7 @@ export(boolean)
export(buffer)
export(cast_options)
export(chunked_array)
+export(compression_codec)
export(csv_convert_options)
export(csv_parse_options)
export(csv_read_options)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 51ed4ea..3d493c7 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -181,6 +181,18 @@ Column__data <- function(column) {
.Call(`_arrow_Column__data`, column)
}
+util___Codec__Create <- function(codec) {
+ .Call(`_arrow_util___Codec__Create`, codec)
+}
+
+io___CompressedOutputStream__Make <- function(codec, raw) {
+ .Call(`_arrow_io___CompressedOutputStream__Make`, codec, raw)
+}
+
+io___CompressedInputStream__Make <- function(codec, raw) {
+ .Call(`_arrow_io___CompressedInputStream__Make`, codec, raw)
+}
+
compute___CastOptions__initialize <- function(allow_int_overflow, allow_time_truncate, allow_float_truncate) {
.Call(`_arrow_compute___CastOptions__initialize`, allow_int_overflow, allow_time_truncate, allow_float_truncate)
}
@@ -553,6 +565,14 @@ io___BufferReader__initialize <- function(buffer) {
.Call(`_arrow_io___BufferReader__initialize`, buffer)
}
+io___Writable__write <- function(stream, buf) {
+ invisible(.Call(`_arrow_io___Writable__write`, stream, buf))
+}
+
+io___OutputStream__Tell <- function(stream) {
+ .Call(`_arrow_io___OutputStream__Tell`, stream)
+}
+
io___FileOutputStream__Open <- function(path) {
.Call(`_arrow_io___FileOutputStream__Open`, path)
}
diff --git a/r/R/buffer.R b/r/R/buffer.R
index 2fecd0e..ca9b2ee 100644
--- a/r/R/buffer.R
+++ b/r/R/buffer.R
@@ -81,3 +81,8 @@ buffer.complex <- function(x) {
shared_ptr(`arrow::Buffer`, r___RBuffer__initialize(x))
}
+#' @export
+`buffer.arrow::Buffer` <- function(x) {
+ x
+}
+
diff --git a/r/R/compression.R b/r/R/compression.R
new file mode 100644
index 0000000..083774a
--- /dev/null
+++ b/r/R/compression.R
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include enums.R
+#' @include R6.R
+#' @include io.R
+
+`arrow::util::Codec` <- R6Class("arrow::util::Codec", inherit = `arrow::Object`)
+
+`arrow::io::CompressedOutputStream` <- R6Class("arrow::io::CompressedOutputStream", inherit = `arrow::io::OutputStream`)
+`arrow::io::CompressedInputStream` <- R6Class("arrow::io::CompressedInputStream", inherit = `arrow::io::InputStream`)
+
+#' codec
+#'
+#' @param type type of codec
+#'
+#' @export
+compression_codec <- function(type = "GZIP") {
+ type <- CompressionType[[match.arg(type, names(CompressionType))]]
+ unique_ptr(`arrow::util::Codec`, util___Codec__Create(type))
+}
+
+
+#' Compressed output stream
+#'
+#' @param stream Underlying raw output stream
+#' @param codec a codec
+#' @export
+CompressedOutputStream <- function(stream, codec = compression_codec("GZIP")){
+ UseMethod("CompressedOutputStream")
+}
+
+#' @export
+CompressedOutputStream.character <- function(stream, codec = compression_codec("GZIP")){
+ CompressedOutputStream(fs::path_abs(stream), codec = codec)
+}
+
+#' @export
+CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){
+ CompressedOutputStream(FileOutputStream(stream), codec = codec)
+}
+
+#' @export
+`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec = compression_codec("GZIP")) {
+ assert_that(inherits(codec, "arrow::util::Codec"))
+ shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream))
+}
+
+#' Compressed input stream
+#'
+#' @param stream Underlying raw input stream
+#' @param codec a codec
+#' @export
+CompressedInputStream <- function(stream, codec = codec("GZIP")){
+ UseMethod("CompressedInputStream")
+}
+
+#' @export
+CompressedInputStream.character <- function(stream, codec = compression_codec("GZIP")){
+ CompressedInputStream(fs::path_abs(stream), codec = codec)
+}
+
+#' @export
+CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){
+ CompressedInputStream(ReadableFile(stream), codec = codec)
+}
+
+#' @export
+`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec = compression_codec("GZIP")) {
+ assert_that(inherits(codec, "arrow::util::Codec"))
+ shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream))
+}
diff --git a/r/R/enums.R b/r/R/enums.R
index 35e6aaa..3a6ac5c 100644
--- a/r/R/enums.R
+++ b/r/R/enums.R
@@ -70,3 +70,9 @@ FileMode <- enum("arrow::io::FileMode",
MessageType <- enum("arrow::ipc::Message::Type",
NONE = 0L, SCHEMA = 1L, DICTIONARY_BATCH = 2L, RECORD_BATCH = 3L, TENSOR = 4L
)
+
+#' @rdname DataType
+#' @export
+CompressionType <- enum("arrow::Compression::type",
+ UNCOMPRESSED = 0L, SNAPPY = 1L, GZIP = 2L, BROTLI = 3L, ZSTD = 4L, LZ4 = 5L, LZO = 6L, BZ2 = 7L
+)
diff --git a/r/R/io.R b/r/R/io.R
index b772be3..ad350f3 100644
--- a/r/R/io.R
+++ b/r/R/io.R
@@ -21,7 +21,11 @@
# OutputStream ------------------------------------------------------------
-`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`)
+`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`,
+ public = list(
+ write = function(x) io___Writable__write(self, buffer(x))
+ )
+)
#' @title OutputStream
#'
@@ -38,7 +42,8 @@
#' @name arrow__io__OutputStream
`arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit = `arrow::io::Writable`,
public = list(
- close = function() io___OutputStream__Close(self)
+ close = function() io___OutputStream__Close(self),
+ tell = function() io___OutputStream__Tell(self)
)
)
diff --git a/r/man/CompressedInputStream.Rd b/r/man/CompressedInputStream.Rd
new file mode 100644
index 0000000..cfff053
--- /dev/null
+++ b/r/man/CompressedInputStream.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{CompressedInputStream}
+\alias{CompressedInputStream}
+\title{Compressed input stream}
+\usage{
+CompressedInputStream(stream, codec = codec("GZIP"))
+}
+\arguments{
+\item{stream}{Underlying raw input stream}
+
+\item{codec}{a codec}
+}
+\description{
+Compressed input stream
+}
diff --git a/r/man/CompressedOutputStream.Rd b/r/man/CompressedOutputStream.Rd
new file mode 100644
index 0000000..85c4d92
--- /dev/null
+++ b/r/man/CompressedOutputStream.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{CompressedOutputStream}
+\alias{CompressedOutputStream}
+\title{Compressed output stream}
+\usage{
+CompressedOutputStream(stream, codec = compression_codec("GZIP"))
+}
+\arguments{
+\item{stream}{Underlying raw output stream}
+
+\item{codec}{a codec}
+}
+\description{
+Compressed output stream
+}
diff --git a/r/man/DataType.Rd b/r/man/DataType.Rd
index b104140..bf5f1d4 100644
--- a/r/man/DataType.Rd
+++ b/r/man/DataType.Rd
@@ -8,6 +8,7 @@
\alias{StatusCode}
\alias{FileMode}
\alias{MessageType}
+\alias{CompressionType}
\alias{int8}
\alias{int16}
\alias{int32}
@@ -45,6 +46,8 @@ FileMode
MessageType
+CompressionType
+
int8()
int16()
diff --git a/r/man/compression_codec.Rd b/r/man/compression_codec.Rd
new file mode 100644
index 0000000..a7db8ab
--- /dev/null
+++ b/r/man/compression_codec.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{compression_codec}
+\alias{compression_codec}
+\title{codec}
+\usage{
+compression_codec(type = "GZIP")
+}
+\arguments{
+\item{type}{type of codec}
+}
+\description{
+codec
+}
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index a31c401..e726f47 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -517,6 +517,41 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// util___Codec__Create
+std::unique_ptr<arrow::util::Codec> util___Codec__Create(arrow::Compression::type codec);
+RcppExport SEXP _arrow_util___Codec__Create(SEXP codecSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< arrow::Compression::type >::type codec(codecSEXP);
+ rcpp_result_gen = Rcpp::wrap(util___Codec__Create(codec));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___CompressedOutputStream__Make
+std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(const std::unique_ptr<arrow::util::Codec>& codec, const std::shared_ptr<arrow::io::OutputStream>& raw);
+RcppExport SEXP _arrow_io___CompressedOutputStream__Make(SEXP codecSEXP, SEXP rawSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::unique_ptr<arrow::util::Codec>& >::type codec(codecSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::OutputStream>& >::type raw(rawSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___CompressedOutputStream__Make(codec, raw));
+ return rcpp_result_gen;
+END_RCPP
+}
+// io___CompressedInputStream__Make
+std::shared_ptr<arrow::io::CompressedInputStream> io___CompressedInputStream__Make(const std::unique_ptr<arrow::util::Codec>& codec, const std::shared_ptr<arrow::io::InputStream>& raw);
+RcppExport SEXP _arrow_io___CompressedInputStream__Make(SEXP codecSEXP, SEXP rawSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::unique_ptr<arrow::util::Codec>& >::type codec(codecSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::InputStream>& >::type raw(rawSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___CompressedInputStream__Make(codec, raw));
+ return rcpp_result_gen;
+END_RCPP
+}
// compute___CastOptions__initialize
std::shared_ptr<arrow::compute::CastOptions> compute___CastOptions__initialize(bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate);
RcppExport SEXP _arrow_compute___CastOptions__initialize(SEXP allow_int_overflowSEXP, SEXP allow_time_truncateSEXP, SEXP allow_float_truncateSEXP) {
@@ -1550,6 +1585,28 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
+// io___Writable__write
+void io___Writable__write(const std::shared_ptr<arrow::io::Writable>& stream, const std::shared_ptr<arrow::Buffer>& buf);
+RcppExport SEXP _arrow_io___Writable__write(SEXP streamSEXP, SEXP bufSEXP) {
+BEGIN_RCPP
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::Writable>& >::type stream(streamSEXP);
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Buffer>& >::type buf(bufSEXP);
+ io___Writable__write(stream, buf);
+ return R_NilValue;
+END_RCPP
+}
+// io___OutputStream__Tell
+int64_t io___OutputStream__Tell(const std::shared_ptr<arrow::io::OutputStream>& stream);
+RcppExport SEXP _arrow_io___OutputStream__Tell(SEXP streamSEXP) {
+BEGIN_RCPP
+ Rcpp::RObject rcpp_result_gen;
+ Rcpp::RNGScope rcpp_rngScope_gen;
+ Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+ rcpp_result_gen = Rcpp::wrap(io___OutputStream__Tell(stream));
+ return rcpp_result_gen;
+END_RCPP
+}
// io___FileOutputStream__Open
std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(const std::string& path);
RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) {
@@ -2288,6 +2345,9 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1},
{"_arrow_Column__type", (DL_FUNC) &_arrow_Column__type, 1},
{"_arrow_Column__data", (DL_FUNC) &_arrow_Column__data, 1},
+ {"_arrow_util___Codec__Create", (DL_FUNC) &_arrow_util___Codec__Create, 1},
+ {"_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2},
+ {"_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2},
{"_arrow_compute___CastOptions__initialize", (DL_FUNC) &_arrow_compute___CastOptions__initialize, 3},
{"_arrow_Array__cast", (DL_FUNC) &_arrow_Array__cast, 3},
{"_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3},
@@ -2381,6 +2441,8 @@ static const R_CallMethodDef CallEntries[] = {
{"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC) &_arrow_io___MemoryMappedFile__Resize, 2},
{"_arrow_io___ReadableFile__Open", (DL_FUNC) &_arrow_io___ReadableFile__Open, 1},
{"_arrow_io___BufferReader__initialize", (DL_FUNC) &_arrow_io___BufferReader__initialize, 1},
+ {"_arrow_io___Writable__write", (DL_FUNC) &_arrow_io___Writable__write, 2},
+ {"_arrow_io___OutputStream__Tell", (DL_FUNC) &_arrow_io___OutputStream__Tell, 1},
{"_arrow_io___FileOutputStream__Open", (DL_FUNC) &_arrow_io___FileOutputStream__Open, 1},
{"_arrow_io___BufferOutputStream__Create", (DL_FUNC) &_arrow_io___BufferOutputStream__Create, 1},
{"_arrow_io___BufferOutputStream__capacity", (DL_FUNC) &_arrow_io___BufferOutputStream__capacity, 1},
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index a657731..4843f95 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -23,12 +23,14 @@
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/csv/reader.h>
+#include <arrow/io/compressed.h>
#include <arrow/io/file.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/feather.h>
#include <arrow/ipc/reader.h>
#include <arrow/ipc/writer.h>
#include <arrow/type.h>
+#include <arrow/util/compression.h>
#define STOP_IF_NOT(TEST, MSG) \
do { \
@@ -129,6 +131,7 @@ RCPP_EXPOSED_ENUM_NODECL(arrow::TimeUnit::type)
RCPP_EXPOSED_ENUM_NODECL(arrow::StatusCode)
RCPP_EXPOSED_ENUM_NODECL(arrow::io::FileMode::type)
RCPP_EXPOSED_ENUM_NODECL(arrow::ipc::Message::Type)
+RCPP_EXPOSED_ENUM_NODECL(arrow::Compression::type)
namespace Rcpp {
namespace internal {
diff --git a/r/src/compression.cpp b/r/src/compression.cpp
new file mode 100644
index 0000000..4c522d8
--- /dev/null
+++ b/r/src/compression.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::unique_ptr<arrow::util::Codec> util___Codec__Create(arrow::Compression::type codec) {
+ std::unique_ptr<arrow::util::Codec> out;
+ STOP_IF_NOT_OK(arrow::util::Codec::Create(codec, &out));
+ return out;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(
+ const std::unique_ptr<arrow::util::Codec>& codec,
+ const std::shared_ptr<arrow::io::OutputStream>& raw) {
+ std::shared_ptr<arrow::io::CompressedOutputStream> stream;
+ STOP_IF_NOT_OK(arrow::io::CompressedOutputStream::Make(codec.get(), raw, &stream));
+ return stream;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::CompressedInputStream> io___CompressedInputStream__Make(
+ const std::unique_ptr<arrow::util::Codec>& codec,
+ const std::shared_ptr<arrow::io::InputStream>& raw) {
+ std::shared_ptr<arrow::io::CompressedInputStream> stream;
+ STOP_IF_NOT_OK(arrow::io::CompressedInputStream::Make(codec.get(), raw, &stream));
+ return stream;
+}
diff --git a/r/src/io.cpp b/r/src/io.cpp
index b8d2d53..2f9fe30 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -115,6 +115,23 @@ std::shared_ptr<arrow::io::BufferReader> io___BufferReader__initialize(
return std::make_shared<arrow::io::BufferReader>(buffer);
}
+// ------- arrow::io::Writable
+
+// [[Rcpp::export]]
+void io___Writable__write(const std::shared_ptr<arrow::io::Writable>& stream,
+ const std::shared_ptr<arrow::Buffer>& buf) {
+ STOP_IF_NOT_OK(stream->Write(buf->data(), buf->size()));
+}
+
+// ------- arrow::io::OutputStream
+
+// [[Rcpp::export]]
+int64_t io___OutputStream__Tell(const std::shared_ptr<arrow::io::OutputStream>& stream) {
+ int64_t position;
+ STOP_IF_NOT_OK(stream->Tell(&position));
+ return position;
+}
+
// ------ arrow::io::FileOutputStream
// [[Rcpp::export]]
diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R
new file mode 100644
index 0000000..5ed0df8
--- /dev/null
+++ b/r/tests/testthat/test-compressed.R
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+context("arrow::io::Compressed.*Stream")
+
+test_that("can write Buffer to CompressedOutputStream and read back in CompressedInputStream", {
+ buf <- buffer(as.raw(sample(0:255, size = 1024, replace = TRUE)))
+
+ tf1 <- local_tempfile()
+ stream1 <- CompressedOutputStream(tf1)
+ stream1$write(buf)
+ expect_error(stream1$tell())
+ stream1$close()
+
+ tf2 <- local_tempfile()
+ sink2 <- FileOutputStream(tf2)
+ stream2 <- CompressedOutputStream(sink2)
+ stream2$write(buf)
+ expect_error(stream2$tell())
+ stream2$close()
+ sink2$close()
+
+
+ input1 <- CompressedInputStream(tf1)
+ buf1 <- input1$Read(1024L)
+
+ file2 <- ReadableFile(tf2)
+ input2 <- CompressedInputStream(file2)
+ buf2 <- input2$Read(1024L)
+
+ expect_equal(buf, buf1)
+ expect_equal(buf, buf2)
+})
+