You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/01/29 03:39:59 UTC

[arrow] branch master updated: ARROW-3761: [R] Bindings for CompressedInputStream, CompressedOutputStream

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f576c3e  ARROW-3761: [R] Bindings for CompressedInputStream, CompressedOutputStream
f576c3e is described below

commit f576c3ebe9159099eeef5c7229bb7041888109f6
Author: Romain Francois <ro...@purrple.cat>
AuthorDate: Mon Jan 28 21:39:54 2019 -0600

    ARROW-3761: [R] Bindings for CompressedInputStream, CompressedOutputStream
    
    Author: Romain Francois <ro...@purrple.cat>
    
    Closes #2962 from romainfrancois/ARROW-3761/CompressedStream and squashes the following commits:
    
    397baca5 <Romain Francois> rebasing
    795cc71d <Romain Francois> rebase
    45c692f8 <Romain Francois> adapt to changes from #3043
    af3714b0 <Romain Francois> using size() until #3043 is merged
    c696ff31 <Romain Francois> lint
    08124a7e <Romain Francois> test reading and writing random bytes to Compressed streams
    a3c5af4b <Romain Francois> align with changes from #3043
    da10e94f <Romain Francois> + Compressed streams
---
 r/DESCRIPTION                      |  3 +-
 r/NAMESPACE                        | 11 +++++
 r/R/RcppExports.R                  | 20 +++++++++
 r/R/buffer.R                       |  5 +++
 r/R/compression.R                  | 86 ++++++++++++++++++++++++++++++++++++++
 r/R/enums.R                        |  6 +++
 r/R/io.R                           |  9 +++-
 r/man/CompressedInputStream.Rd     | 16 +++++++
 r/man/CompressedOutputStream.Rd    | 16 +++++++
 r/man/DataType.Rd                  |  3 ++
 r/man/compression_codec.Rd         | 14 +++++++
 r/src/RcppExports.cpp              | 62 +++++++++++++++++++++++++++
 r/src/arrow_types.h                |  3 ++
 r/src/compression.cpp              | 43 +++++++++++++++++++
 r/src/io.cpp                       | 17 ++++++++
 r/tests/testthat/test-compressed.R | 48 +++++++++++++++++++++
 16 files changed, 359 insertions(+), 3 deletions(-)

diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index f5a904b..d994575 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -55,11 +55,12 @@ Collate:
     'Table.R'
     'array.R'
     'buffer.R'
+    'io.R'
+    'compression.R'
     'compute.R'
     'csv.R'
     'dictionary.R'
     'feather.R'
-    'io.R'
     'memory_pool.R'
     'message.R'
     'on_exit.R'
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 7fd76c7..4636c12 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -8,6 +8,12 @@ S3method("==","arrow::RecordBatch")
 S3method("==","arrow::ipc::Message")
 S3method(BufferReader,"arrow::Buffer")
 S3method(BufferReader,default)
+S3method(CompressedInputStream,"arrow::io::InputStream")
+S3method(CompressedInputStream,character)
+S3method(CompressedInputStream,fs_path)
+S3method(CompressedOutputStream,"arrow::io::OutputStream")
+S3method(CompressedOutputStream,character)
+S3method(CompressedOutputStream,fs_path)
 S3method(FeatherTableReader,"arrow::io::RandomAccessFile")
 S3method(FeatherTableReader,"arrow::ipc::feather::TableReader")
 S3method(FeatherTableReader,character)
@@ -34,6 +40,7 @@ S3method(RecordBatchStreamWriter,character)
 S3method(RecordBatchStreamWriter,fs_path)
 S3method(as_tibble,"arrow::RecordBatch")
 S3method(as_tibble,"arrow::Table")
+S3method(buffer,"arrow::Buffer")
 S3method(buffer,complex)
 S3method(buffer,default)
 S3method(buffer,integer)
@@ -74,6 +81,9 @@ S3method(write_feather_RecordBatch,default)
 S3method(write_feather_RecordBatch,fs_path)
 export(BufferOutputStream)
 export(BufferReader)
+export(CompressedInputStream)
+export(CompressedOutputStream)
+export(CompressionType)
 export(DateUnit)
 export(FeatherTableReader)
 export(FeatherTableWriter)
@@ -99,6 +109,7 @@ export(boolean)
 export(buffer)
 export(cast_options)
 export(chunked_array)
+export(compression_codec)
 export(csv_convert_options)
 export(csv_parse_options)
 export(csv_read_options)
diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R
index 51ed4ea..3d493c7 100644
--- a/r/R/RcppExports.R
+++ b/r/R/RcppExports.R
@@ -181,6 +181,18 @@ Column__data <- function(column) {
     .Call(`_arrow_Column__data`, column)
 }
 
+util___Codec__Create <- function(codec) {
+    .Call(`_arrow_util___Codec__Create`, codec)
+}
+
+io___CompressedOutputStream__Make <- function(codec, raw) {
+    .Call(`_arrow_io___CompressedOutputStream__Make`, codec, raw)
+}
+
+io___CompressedInputStream__Make <- function(codec, raw) {
+    .Call(`_arrow_io___CompressedInputStream__Make`, codec, raw)
+}
+
 compute___CastOptions__initialize <- function(allow_int_overflow, allow_time_truncate, allow_float_truncate) {
     .Call(`_arrow_compute___CastOptions__initialize`, allow_int_overflow, allow_time_truncate, allow_float_truncate)
 }
@@ -553,6 +565,14 @@ io___BufferReader__initialize <- function(buffer) {
     .Call(`_arrow_io___BufferReader__initialize`, buffer)
 }
 
+io___Writable__write <- function(stream, buf) {
+    invisible(.Call(`_arrow_io___Writable__write`, stream, buf))
+}
+
+io___OutputStream__Tell <- function(stream) {
+    .Call(`_arrow_io___OutputStream__Tell`, stream)
+}
+
 io___FileOutputStream__Open <- function(path) {
     .Call(`_arrow_io___FileOutputStream__Open`, path)
 }
diff --git a/r/R/buffer.R b/r/R/buffer.R
index 2fecd0e..ca9b2ee 100644
--- a/r/R/buffer.R
+++ b/r/R/buffer.R
@@ -81,3 +81,8 @@ buffer.complex <- function(x) {
   shared_ptr(`arrow::Buffer`, r___RBuffer__initialize(x))
 }
 
+#' @export
+`buffer.arrow::Buffer` <- function(x) {
+  x
+}
+
diff --git a/r/R/compression.R b/r/R/compression.R
new file mode 100644
index 0000000..083774a
--- /dev/null
+++ b/r/R/compression.R
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include enums.R
+#' @include R6.R
+#' @include io.R
+
+`arrow::util::Codec` <- R6Class("arrow::util::Codec", inherit = `arrow::Object`)
+
+`arrow::io::CompressedOutputStream` <- R6Class("arrow::io::CompressedOutputStream", inherit = `arrow::io::OutputStream`)
+`arrow::io::CompressedInputStream` <- R6Class("arrow::io::CompressedInputStream", inherit = `arrow::io::InputStream`)
+
+#' codec
+#'
+#' @param type type of codec
+#'
+#' @export
+compression_codec <- function(type = "GZIP") {
+  type <- CompressionType[[match.arg(type, names(CompressionType))]]
+  unique_ptr(`arrow::util::Codec`, util___Codec__Create(type))
+}
+
+
+#' Compressed output stream
+#'
+#' @param stream Underlying raw output stream
+#' @param codec a codec
+#' @export
+CompressedOutputStream <- function(stream, codec = compression_codec("GZIP")){
+  UseMethod("CompressedOutputStream")
+}
+
+#' @export
+CompressedOutputStream.character <- function(stream, codec = compression_codec("GZIP")){
+  CompressedOutputStream(fs::path_abs(stream), codec = codec)
+}
+
+#' @export
+CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){
+  CompressedOutputStream(FileOutputStream(stream), codec = codec)
+}
+
+#' @export
+`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec = compression_codec("GZIP")) {
+  assert_that(inherits(codec, "arrow::util::Codec"))
+  shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream))
+}
+
+#' Compressed input stream
+#'
+#' @param stream Underlying raw input stream
+#' @param codec a codec
+#' @export
+CompressedInputStream <- function(stream, codec = codec("GZIP")){
+  UseMethod("CompressedInputStream")
+}
+
+#' @export
+CompressedInputStream.character <- function(stream, codec = compression_codec("GZIP")){
+  CompressedInputStream(fs::path_abs(stream), codec = codec)
+}
+
+#' @export
+CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){
+  CompressedInputStream(ReadableFile(stream), codec = codec)
+}
+
+#' @export
+`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec = compression_codec("GZIP")) {
+  assert_that(inherits(codec, "arrow::util::Codec"))
+  shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream))
+}
diff --git a/r/R/enums.R b/r/R/enums.R
index 35e6aaa..3a6ac5c 100644
--- a/r/R/enums.R
+++ b/r/R/enums.R
@@ -70,3 +70,9 @@ FileMode <- enum("arrow::io::FileMode",
 MessageType <- enum("arrow::ipc::Message::Type",
   NONE = 0L, SCHEMA = 1L, DICTIONARY_BATCH = 2L, RECORD_BATCH = 3L, TENSOR = 4L
 )
+
+#' @rdname DataType
+#' @export
+CompressionType <- enum("arrow::Compression::type",
+  UNCOMPRESSED = 0L, SNAPPY = 1L, GZIP = 2L, BROTLI = 3L, ZSTD = 4L, LZ4 = 5L, LZO = 6L, BZ2 = 7L
+)
diff --git a/r/R/io.R b/r/R/io.R
index b772be3..ad350f3 100644
--- a/r/R/io.R
+++ b/r/R/io.R
@@ -21,7 +21,11 @@
 
 # OutputStream ------------------------------------------------------------
 
-`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`)
+`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`,
+  public = list(
+    write = function(x) io___Writable__write(self, buffer(x))
+  )
+)
 
 #' @title OutputStream
 #'
@@ -38,7 +42,8 @@
 #' @name arrow__io__OutputStream
 `arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit = `arrow::io::Writable`,
   public = list(
-    close = function() io___OutputStream__Close(self)
+    close = function() io___OutputStream__Close(self),
+    tell = function() io___OutputStream__Tell(self)
   )
 )
 
diff --git a/r/man/CompressedInputStream.Rd b/r/man/CompressedInputStream.Rd
new file mode 100644
index 0000000..cfff053
--- /dev/null
+++ b/r/man/CompressedInputStream.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{CompressedInputStream}
+\alias{CompressedInputStream}
+\title{Compressed input stream}
+\usage{
+CompressedInputStream(stream, codec = codec("GZIP"))
+}
+\arguments{
+\item{stream}{Underlying raw input stream}
+
+\item{codec}{a codec}
+}
+\description{
+Compressed input stream
+}
diff --git a/r/man/CompressedOutputStream.Rd b/r/man/CompressedOutputStream.Rd
new file mode 100644
index 0000000..85c4d92
--- /dev/null
+++ b/r/man/CompressedOutputStream.Rd
@@ -0,0 +1,16 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{CompressedOutputStream}
+\alias{CompressedOutputStream}
+\title{Compressed output stream}
+\usage{
+CompressedOutputStream(stream, codec = compression_codec("GZIP"))
+}
+\arguments{
+\item{stream}{Underlying raw output stream}
+
+\item{codec}{a codec}
+}
+\description{
+Compressed output stream
+}
diff --git a/r/man/DataType.Rd b/r/man/DataType.Rd
index b104140..bf5f1d4 100644
--- a/r/man/DataType.Rd
+++ b/r/man/DataType.Rd
@@ -8,6 +8,7 @@
 \alias{StatusCode}
 \alias{FileMode}
 \alias{MessageType}
+\alias{CompressionType}
 \alias{int8}
 \alias{int16}
 \alias{int32}
@@ -45,6 +46,8 @@ FileMode
 
 MessageType
 
+CompressionType
+
 int8()
 
 int16()
diff --git a/r/man/compression_codec.Rd b/r/man/compression_codec.Rd
new file mode 100644
index 0000000..a7db8ab
--- /dev/null
+++ b/r/man/compression_codec.Rd
@@ -0,0 +1,14 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/compression.R
+\name{compression_codec}
+\alias{compression_codec}
+\title{codec}
+\usage{
+compression_codec(type = "GZIP")
+}
+\arguments{
+\item{type}{type of codec}
+}
+\description{
+codec
+}
diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp
index a31c401..e726f47 100644
--- a/r/src/RcppExports.cpp
+++ b/r/src/RcppExports.cpp
@@ -517,6 +517,41 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// util___Codec__Create
+std::unique_ptr<arrow::util::Codec> util___Codec__Create(arrow::Compression::type codec);
+RcppExport SEXP _arrow_util___Codec__Create(SEXP codecSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< arrow::Compression::type >::type codec(codecSEXP);
+    rcpp_result_gen = Rcpp::wrap(util___Codec__Create(codec));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___CompressedOutputStream__Make
+std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(const std::unique_ptr<arrow::util::Codec>& codec, const std::shared_ptr<arrow::io::OutputStream>& raw);
+RcppExport SEXP _arrow_io___CompressedOutputStream__Make(SEXP codecSEXP, SEXP rawSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::unique_ptr<arrow::util::Codec>& >::type codec(codecSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::OutputStream>& >::type raw(rawSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___CompressedOutputStream__Make(codec, raw));
+    return rcpp_result_gen;
+END_RCPP
+}
+// io___CompressedInputStream__Make
+std::shared_ptr<arrow::io::CompressedInputStream> io___CompressedInputStream__Make(const std::unique_ptr<arrow::util::Codec>& codec, const std::shared_ptr<arrow::io::InputStream>& raw);
+RcppExport SEXP _arrow_io___CompressedInputStream__Make(SEXP codecSEXP, SEXP rawSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::unique_ptr<arrow::util::Codec>& >::type codec(codecSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::InputStream>& >::type raw(rawSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___CompressedInputStream__Make(codec, raw));
+    return rcpp_result_gen;
+END_RCPP
+}
 // compute___CastOptions__initialize
 std::shared_ptr<arrow::compute::CastOptions> compute___CastOptions__initialize(bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate);
 RcppExport SEXP _arrow_compute___CastOptions__initialize(SEXP allow_int_overflowSEXP, SEXP allow_time_truncateSEXP, SEXP allow_float_truncateSEXP) {
@@ -1550,6 +1585,28 @@ BEGIN_RCPP
     return rcpp_result_gen;
 END_RCPP
 }
+// io___Writable__write
+void io___Writable__write(const std::shared_ptr<arrow::io::Writable>& stream, const std::shared_ptr<arrow::Buffer>& buf);
+RcppExport SEXP _arrow_io___Writable__write(SEXP streamSEXP, SEXP bufSEXP) {
+BEGIN_RCPP
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::Writable>& >::type stream(streamSEXP);
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::Buffer>& >::type buf(bufSEXP);
+    io___Writable__write(stream, buf);
+    return R_NilValue;
+END_RCPP
+}
+// io___OutputStream__Tell
+int64_t io___OutputStream__Tell(const std::shared_ptr<arrow::io::OutputStream>& stream);
+RcppExport SEXP _arrow_io___OutputStream__Tell(SEXP streamSEXP) {
+BEGIN_RCPP
+    Rcpp::RObject rcpp_result_gen;
+    Rcpp::RNGScope rcpp_rngScope_gen;
+    Rcpp::traits::input_parameter< const std::shared_ptr<arrow::io::OutputStream>& >::type stream(streamSEXP);
+    rcpp_result_gen = Rcpp::wrap(io___OutputStream__Tell(stream));
+    return rcpp_result_gen;
+END_RCPP
+}
 // io___FileOutputStream__Open
 std::shared_ptr<arrow::io::FileOutputStream> io___FileOutputStream__Open(const std::string& path);
 RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) {
@@ -2288,6 +2345,9 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1},
     {"_arrow_Column__type", (DL_FUNC) &_arrow_Column__type, 1},
     {"_arrow_Column__data", (DL_FUNC) &_arrow_Column__data, 1},
+    {"_arrow_util___Codec__Create", (DL_FUNC) &_arrow_util___Codec__Create, 1},
+    {"_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2},
+    {"_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2},
     {"_arrow_compute___CastOptions__initialize", (DL_FUNC) &_arrow_compute___CastOptions__initialize, 3},
     {"_arrow_Array__cast", (DL_FUNC) &_arrow_Array__cast, 3},
     {"_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3},
@@ -2381,6 +2441,8 @@ static const R_CallMethodDef CallEntries[] = {
     {"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC) &_arrow_io___MemoryMappedFile__Resize, 2},
     {"_arrow_io___ReadableFile__Open", (DL_FUNC) &_arrow_io___ReadableFile__Open, 1},
     {"_arrow_io___BufferReader__initialize", (DL_FUNC) &_arrow_io___BufferReader__initialize, 1},
+    {"_arrow_io___Writable__write", (DL_FUNC) &_arrow_io___Writable__write, 2},
+    {"_arrow_io___OutputStream__Tell", (DL_FUNC) &_arrow_io___OutputStream__Tell, 1},
     {"_arrow_io___FileOutputStream__Open", (DL_FUNC) &_arrow_io___FileOutputStream__Open, 1},
     {"_arrow_io___BufferOutputStream__Create", (DL_FUNC) &_arrow_io___BufferOutputStream__Create, 1},
     {"_arrow_io___BufferOutputStream__capacity", (DL_FUNC) &_arrow_io___BufferOutputStream__capacity, 1},
diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h
index a657731..4843f95 100644
--- a/r/src/arrow_types.h
+++ b/r/src/arrow_types.h
@@ -23,12 +23,14 @@
 #include <arrow/api.h>
 #include <arrow/compute/api.h>
 #include <arrow/csv/reader.h>
+#include <arrow/io/compressed.h>
 #include <arrow/io/file.h>
 #include <arrow/io/memory.h>
 #include <arrow/ipc/feather.h>
 #include <arrow/ipc/reader.h>
 #include <arrow/ipc/writer.h>
 #include <arrow/type.h>
+#include <arrow/util/compression.h>
 
 #define STOP_IF_NOT(TEST, MSG)  \
   do {                          \
@@ -129,6 +131,7 @@ RCPP_EXPOSED_ENUM_NODECL(arrow::TimeUnit::type)
 RCPP_EXPOSED_ENUM_NODECL(arrow::StatusCode)
 RCPP_EXPOSED_ENUM_NODECL(arrow::io::FileMode::type)
 RCPP_EXPOSED_ENUM_NODECL(arrow::ipc::Message::Type)
+RCPP_EXPOSED_ENUM_NODECL(arrow::Compression::type)
 
 namespace Rcpp {
 namespace internal {
diff --git a/r/src/compression.cpp b/r/src/compression.cpp
new file mode 100644
index 0000000..4c522d8
--- /dev/null
+++ b/r/src/compression.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow_types.h"
+
+// [[Rcpp::export]]
+std::unique_ptr<arrow::util::Codec> util___Codec__Create(arrow::Compression::type codec) {
+  std::unique_ptr<arrow::util::Codec> out;
+  STOP_IF_NOT_OK(arrow::util::Codec::Create(codec, &out));
+  return out;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::CompressedOutputStream> io___CompressedOutputStream__Make(
+    const std::unique_ptr<arrow::util::Codec>& codec,
+    const std::shared_ptr<arrow::io::OutputStream>& raw) {
+  std::shared_ptr<arrow::io::CompressedOutputStream> stream;
+  STOP_IF_NOT_OK(arrow::io::CompressedOutputStream::Make(codec.get(), raw, &stream));
+  return stream;
+}
+
+// [[Rcpp::export]]
+std::shared_ptr<arrow::io::CompressedInputStream> io___CompressedInputStream__Make(
+    const std::unique_ptr<arrow::util::Codec>& codec,
+    const std::shared_ptr<arrow::io::InputStream>& raw) {
+  std::shared_ptr<arrow::io::CompressedInputStream> stream;
+  STOP_IF_NOT_OK(arrow::io::CompressedInputStream::Make(codec.get(), raw, &stream));
+  return stream;
+}
diff --git a/r/src/io.cpp b/r/src/io.cpp
index b8d2d53..2f9fe30 100644
--- a/r/src/io.cpp
+++ b/r/src/io.cpp
@@ -115,6 +115,23 @@ std::shared_ptr<arrow::io::BufferReader> io___BufferReader__initialize(
   return std::make_shared<arrow::io::BufferReader>(buffer);
 }
 
+// ------- arrow::io::Writable
+
+// [[Rcpp::export]]
+void io___Writable__write(const std::shared_ptr<arrow::io::Writable>& stream,
+                          const std::shared_ptr<arrow::Buffer>& buf) {
+  STOP_IF_NOT_OK(stream->Write(buf->data(), buf->size()));
+}
+
+// ------- arrow::io::OutputStream
+
+// [[Rcpp::export]]
+int64_t io___OutputStream__Tell(const std::shared_ptr<arrow::io::OutputStream>& stream) {
+  int64_t position;
+  STOP_IF_NOT_OK(stream->Tell(&position));
+  return position;
+}
+
 // ------ arrow::io::FileOutputStream
 
 // [[Rcpp::export]]
diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R
new file mode 100644
index 0000000..5ed0df8
--- /dev/null
+++ b/r/tests/testthat/test-compressed.R
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+context("arrow::io::Compressed.*Stream")
+
+test_that("can write Buffer to CompressedOutputStream and read back in CompressedInputStream", {
+  buf <- buffer(as.raw(sample(0:255, size = 1024, replace = TRUE)))
+
+  tf1 <- local_tempfile()
+  stream1 <- CompressedOutputStream(tf1)
+  stream1$write(buf)
+  expect_error(stream1$tell())
+  stream1$close()
+
+  tf2 <- local_tempfile()
+  sink2 <- FileOutputStream(tf2)
+  stream2 <- CompressedOutputStream(sink2)
+  stream2$write(buf)
+  expect_error(stream2$tell())
+  stream2$close()
+  sink2$close()
+
+
+  input1 <- CompressedInputStream(tf1)
+  buf1 <- input1$Read(1024L)
+
+  file2 <- ReadableFile(tf2)
+  input2 <- CompressedInputStream(file2)
+  buf2 <- input2$Read(1024L)
+
+  expect_equal(buf, buf1)
+  expect_equal(buf, buf2)
+})
+