You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2022/07/27 12:27:32 UTC

[arrow] 03/06: ARROW-16612: [R] Fix compression inference from filename (#13625)

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 107163fec888e36a2d576d1f992f0e6f41ef7ad1
Author: Neal Richardson <ne...@gmail.com>
AuthorDate: Wed Jul 27 07:02:05 2022 -0400

    ARROW-16612: [R] Fix compression inference from filename (#13625)
    
    This is actually a much larger change than the original issue.
    
    * ~Infer compression from the file extension in `write_parquet()` and pass it to ParquetFileWriter rather than write to a CompressedOutputStream, and don't wrap the in a CompressedInputStream in `read_parquet()` because that doesn't work (and isn't how compression works for Parquet). Previously, reading from a file with extension `.parquet.gz` etc. would error unless you opened an input stream yourself. This is the original report from ARROW-16612.~ Cut and moved to [ARROW-17221](http [...]
    * Likewise for `read_feather()` and `write_feather()`, which also support compression within the file itself and not around it.
    * Since the whole "detect compression and wrap in a compressed stream" feature seems limited to CSV and JSON, and in making the changes here I was having to hack around that feature, I refactored to pull it out of the internal functions `make_readable_file()` and `make_output_stream()` and do it only in the csv/json functions.
    * In the process of refactoring, I noticed and fixed two bugs: (1) no matter what compression extension you provided to `make_output_stream()`, you would get a gzip-compressed stream because we weren't actually passing the codec to `CompressedOutputStream$create()`; (2) `.lz4` actually needs to be mapped to the "lz4_frame" codec; attempting to write a CSV to a `CompressedOutputStream$create(codec = "lz4")` raises an error. Neither were caught because our tests for this feature only te [...]
    * The refactoring should also mean that ARROW-16619 (inferring compression from URL), as well as from SubTreeFileSystem (S3 buckets etc.), is also supported.
    
    Authored-by: Neal Richardson <ne...@gmail.com>
    Signed-off-by: Neal Richardson <ne...@gmail.com>
---
 r/R/csv.R                          | 40 +++++++++++---------
 r/R/feather.R                      | 21 +++++++----
 r/R/io.R                           | 76 ++++++++++++--------------------------
 r/R/ipc-stream.R                   | 10 -----
 r/R/json.R                         |  5 +++
 r/R/parquet.R                      |  9 +++++
 r/man/make_readable_file.Rd        | 11 +-----
 r/man/read_feather.Rd              |  6 +--
 r/man/read_ipc_stream.Rd           |  6 ---
 r/man/write_feather.Rd             |  9 +++--
 r/man/write_ipc_stream.Rd          |  6 ---
 r/tests/testthat/test-compressed.R |  8 ++++
 r/tests/testthat/test-csv.R        | 25 ++++++++++++-
 r/tests/testthat/test-feather.R    | 16 ++++++++
 r/tests/testthat/test-parquet.R    | 16 ++++++++
 15 files changed, 145 insertions(+), 119 deletions(-)

diff --git a/r/R/csv.R b/r/R/csv.R
index 32ed0e4bee..6adbb40219 100644
--- a/r/R/csv.R
+++ b/r/R/csv.R
@@ -188,7 +188,12 @@ read_delim_arrow <- function(file,
   }
 
   if (!inherits(file, "InputStream")) {
+    compression <- detect_compression(file)
     file <- make_readable_file(file)
+    if (compression != "uncompressed") {
+      # TODO: accept compression and compression_level as args
+      file <- CompressedInputStream$create(file, compression)
+    }
     on.exit(file$close())
   }
   reader <- CsvTableReader$create(
@@ -699,7 +704,6 @@ write_csv_arrow <- function(x,
     )
   }
 
-  # default values are considered missing by base R
   if (missing(include_header) && !missing(col_names)) {
     include_header <- col_names
   }
@@ -712,16 +716,27 @@ write_csv_arrow <- function(x,
   }
 
   x_out <- x
-  if (is.data.frame(x)) {
-    x <- Table$create(x)
-  }
-
-  if (inherits(x, c("Dataset", "arrow_dplyr_query"))) {
-    x <- Scanner$create(x)$ToRecordBatchReader()
+  if (!inherits(x, "ArrowTabular")) {
+    tryCatch(
+      x <- as_record_batch_reader(x),
+      error = function(e) {
+        abort(
+          paste0(
+            "x must be an object of class 'data.frame', 'RecordBatch', ",
+            "'Dataset', 'Table', or 'RecordBatchReader' not '", class(x)[1], "'."
+          )
+        )
+      }
+    )
   }
 
   if (!inherits(sink, "OutputStream")) {
+    compression <- detect_compression(sink)
     sink <- make_output_stream(sink)
+    if (compression != "uncompressed") {
+      # TODO: accept compression and compression_level as args
+      sink <- CompressedOutputStream$create(sink, codec = compression)
+    }
     on.exit(sink$close())
   }
 
@@ -731,17 +746,6 @@ write_csv_arrow <- function(x,
     csv___WriteCSV__Table(x, write_options, sink)
   } else if (inherits(x, c("RecordBatchReader"))) {
     csv___WriteCSV__RecordBatchReader(x, write_options, sink)
-  } else {
-    abort(
-      c(
-        paste0(
-          paste(
-            "x must be an object of class 'data.frame', 'RecordBatch',",
-            "'Dataset', 'Table', or 'RecordBatchReader' not '"
-          ), class(x)[[1]], "'."
-        )
-      )
-    )
   }
 
   invisible(x_out)
diff --git a/r/R/feather.R b/r/R/feather.R
index 03c8a7b5f0..4e2e9947cb 100644
--- a/r/R/feather.R
+++ b/r/R/feather.R
@@ -38,8 +38,9 @@
 #' @param compression Name of compression codec to use, if any. Default is
 #' "lz4" if LZ4 is available in your build of the Arrow C++ library, otherwise
 #' "uncompressed". "zstd" is the other available codec and generally has better
-#' compression ratios in exchange for slower read and write performance
-#' See [codec_is_available()]. This option is not supported for V1.
+#' compression ratios in exchange for slower read and write performance.
+#' "lz4" is shorthand for the "lz4_frame" codec.
+#' See [codec_is_available()] for details. This option is not supported for V1.
 #' @param compression_level If `compression` is "zstd", you may
 #' specify an integer compression level. If omitted, the compression codec's
 #' default compression level is used.
@@ -67,11 +68,13 @@ write_feather <- function(x,
                           sink,
                           version = 2,
                           chunk_size = 65536L,
-                          compression = c("default", "lz4", "uncompressed", "zstd"),
+                          compression = c("default", "lz4", "lz4_frame", "uncompressed", "zstd"),
                           compression_level = NULL) {
   # Handle and validate options before touching data
   version <- as.integer(version)
   assert_that(version %in% 1:2)
+
+  # TODO(ARROW-17221): if (missing(compression)), we could detect_compression(sink) here
   compression <- match.arg(compression)
   chunk_size <- as.integer(chunk_size)
   assert_that(chunk_size > 0)
@@ -128,7 +131,7 @@ write_feather <- function(x,
 write_ipc_file <- function(x,
                            sink,
                            chunk_size = 65536L,
-                           compression = c("default", "lz4", "uncompressed", "zstd"),
+                           compression = c("default", "lz4", "lz4_frame", "uncompressed", "zstd"),
                            compression_level = NULL) {
   mc <- match.call()
   mc$version <- 2
@@ -147,7 +150,7 @@ write_ipc_file <- function(x,
 #'
 #' @inheritParams read_ipc_stream
 #' @inheritParams read_delim_arrow
-#' @param ... additional parameters, passed to [make_readable_file()].
+#' @inheritParams make_readable_file
 #'
 #' @return A `data.frame` if `as_data_frame` is `TRUE` (the default), or an
 #' Arrow [Table] otherwise
@@ -163,9 +166,13 @@ write_ipc_file <- function(x,
 #' dim(df)
 #' # Can select columns
 #' df <- read_feather(tf, col_select = starts_with("d"))
-read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) {
+read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, mmap = TRUE) {
   if (!inherits(file, "RandomAccessFile")) {
-    file <- make_readable_file(file, ...)
+    # Compression is handled inside the IPC file format, so we don't need
+    # to detect from the file extension and wrap in a CompressedInputStream
+    # TODO: Why is this the only read_format() functions that allows passing
+    # mmap to make_readable_file?
+    file <- make_readable_file(file, mmap)
     on.exit(file$close())
   }
   reader <- FeatherReader$create(file)
diff --git a/r/R/io.R b/r/R/io.R
index 82e3847df5..fc664ed386 100644
--- a/r/R/io.R
+++ b/r/R/io.R
@@ -229,52 +229,31 @@ mmap_open <- function(path, mode = c("read", "write", "readwrite")) {
 #' Handle a range of possible input sources
 #' @param file A character file name, `raw` vector, or an Arrow input stream
 #' @param mmap Logical: whether to memory-map the file (default `TRUE`)
-#' @param compression If the file is compressed, created a [CompressedInputStream]
-#' with this compression codec, either a [Codec] or the string name of one.
-#' If `NULL` (default) and `file` is a string file name, the function will try
-#' to infer compression from the file extension.
-#' @param filesystem If not `NULL`, `file` will be opened via the
-#' `filesystem$OpenInputFile()` filesystem method, rather than the `io` module's
-#' `MemoryMappedFile` or `ReadableFile` constructors.
 #' @return An `InputStream` or a subclass of one.
 #' @keywords internal
-make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem = NULL) {
+make_readable_file <- function(file, mmap = TRUE) {
   if (inherits(file, "SubTreeFileSystem")) {
     filesystem <- file$base_fs
-    # SubTreeFileSystem adds a slash to base_path, but filesystems will reject file names
-    # with trailing slashes, so we need to remove it here.
-    file <- sub("/$", "", file$base_path)
-  }
-  if (is.string(file)) {
+    # SubTreeFileSystem adds a slash to base_path, but filesystems will reject
+    # file names with trailing slashes, so we need to remove it here.
+    path <- sub("/$", "", file$base_path)
+    file <- filesystem$OpenInputFile(path)
+  } else if (is.string(file)) {
     if (is_url(file)) {
       file <- tryCatch(
         {
           fs_and_path <- FileSystem$from_uri(file)
-          filesystem <- fs_and_path$fs
-          fs_and_path$path
+          fs_and_path$fs$OpenInputFile(fs_and_path$path)
         },
         error = function(e) {
           MakeRConnectionInputStream(url(file, open = "rb"))
         }
       )
-    }
-
-    if (is.null(compression)) {
-      # Infer compression from the file path
-      compression <- detect_compression(file)
-    }
-
-    if (!is.null(filesystem)) {
-      file <- filesystem$OpenInputFile(file)
-    } else if (is.string(file) && isTRUE(mmap)) {
+    } else if (isTRUE(mmap)) {
       file <- mmap_open(file)
-    } else if (is.string(file)) {
+    } else {
       file <- ReadableFile$create(file)
     }
-
-    if (is_compressed(compression)) {
-      file <- CompressedInputStream$create(file, compression)
-    }
   } else if (inherits(file, c("raw", "Buffer"))) {
     file <- BufferReader$create(file)
   } else if (inherits(file, "connection")) {
@@ -294,7 +273,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
   file
 }
 
-make_output_stream <- function(x, filesystem = NULL, compression = NULL) {
+make_output_stream <- function(x) {
   if (inherits(x, "connection")) {
     if (!isOpen(x)) {
       open(x, "wb")
@@ -305,45 +284,36 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) {
 
   if (inherits(x, "SubTreeFileSystem")) {
     filesystem <- x$base_fs
-    # SubTreeFileSystem adds a slash to base_path, but filesystems will reject file names
-    # with trailing slashes, so we need to remove it here.
-    x <- sub("/$", "", x$base_path)
+    # SubTreeFileSystem adds a slash to base_path, but filesystems will reject
+    # file names with trailing slashes, so we need to remove it here.
+    path <- sub("/$", "", x$base_path)
+    filesystem$OpenOutputStream(path)
   } else if (is_url(x)) {
     fs_and_path <- FileSystem$from_uri(x)
-    filesystem <- fs_and_path$fs
-    x <- fs_and_path$path
-  }
-
-  if (is.null(compression)) {
-    # Infer compression from sink
-    compression <- detect_compression(x)
-  }
-
-  assert_that(is.string(x))
-  if (is.null(filesystem) && is_compressed(compression)) {
-    CompressedOutputStream$create(x) ## compressed local
-  } else if (is.null(filesystem) && !is_compressed(compression)) {
-    FileOutputStream$create(x) ## uncompressed local
-  } else if (!is.null(filesystem) && is_compressed(compression)) {
-    CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed remote
+    fs_and_path$fs$OpenOutputStream(fs_and_path$path)
   } else {
-    filesystem$OpenOutputStream(x) ## uncompressed remote
+    assert_that(is.string(x))
+    FileOutputStream$create(x)
   }
 }
 
 detect_compression <- function(path) {
+  if (inherits(path, "SubTreeFileSystem")) {
+    path <- path$base_path
+  }
   if (!is.string(path)) {
     return("uncompressed")
   }
 
-  # Remove any trailing slashes, which FileSystem$from_uri may add
+  # Remove any trailing slashes, which SubTreeFileSystem may add
   path <- sub("/$", "", path)
 
   switch(tools::file_ext(path),
     bz2 = "bz2",
     gz = "gzip",
-    lz4 = "lz4",
+    lz4 = "lz4_frame",
     zst = "zstd",
+    snappy = "snappy",
     "uncompressed"
   )
 }
diff --git a/r/R/ipc-stream.R b/r/R/ipc-stream.R
index 9fea0f9e52..dd59d0f4df 100644
--- a/r/R/ipc-stream.R
+++ b/r/R/ipc-stream.R
@@ -23,11 +23,6 @@
 #' a "stream" format and a "file" format, known as Feather. `write_ipc_stream()`
 #' and [write_feather()] write those formats, respectively.
 #'
-#' `write_arrow()`, a wrapper around `write_ipc_stream()` and `write_feather()`
-#' with some nonstandard behavior, is deprecated. You should explicitly choose
-#' the function that will write the desired IPC format (stream or file) since
-#' either can be written to a file or `OutputStream`.
-#'
 #' @inheritParams write_feather
 #' @param ... extra parameters passed to `write_feather()`.
 #'
@@ -87,11 +82,6 @@ write_to_raw <- function(x, format = c("stream", "file")) {
 #' a "stream" format and a "file" format, known as Feather. `read_ipc_stream()`
 #' and [read_feather()] read those formats, respectively.
 #'
-#' `read_arrow()`, a wrapper around `read_ipc_stream()` and `read_feather()`,
-#' is deprecated. You should explicitly choose
-#' the function that will read the desired IPC format (stream or file) since
-#' a file or `InputStream` may contain either.
-#'
 #' @param file A character file name or URI, `raw` vector, an Arrow input stream,
 #' or a `FileSystem` with path (`SubTreeFileSystem`).
 #' If a file name or URI, an Arrow [InputStream] will be opened and
diff --git a/r/R/json.R b/r/R/json.R
index 19cf6a9299..2b1f4916cb 100644
--- a/r/R/json.R
+++ b/r/R/json.R
@@ -44,7 +44,12 @@ read_json_arrow <- function(file,
                             schema = NULL,
                             ...) {
   if (!inherits(file, "InputStream")) {
+    compression <- detect_compression(file)
     file <- make_readable_file(file)
+    if (compression != "uncompressed") {
+      # TODO: accept compression and compression_level as args
+      file <- CompressedInputStream$create(file, compression)
+    }
     on.exit(file$close())
   }
   tab <- JsonTableReader$create(file, schema = schema, ...)$Read()
diff --git a/r/R/parquet.R b/r/R/parquet.R
index 8cd9daa857..0b3f93b20e 100644
--- a/r/R/parquet.R
+++ b/r/R/parquet.R
@@ -36,9 +36,17 @@
 read_parquet <- function(file,
                          col_select = NULL,
                          as_data_frame = TRUE,
+                         # TODO: for consistency with other readers/writers,
+                         # these properties should be enumerated as args here,
+                         # and ParquetArrowReaderProperties$create() should
+                         # accept them, as with ParquetWriterProperties.
+                         # Assembling `props` yourself is something you do with
+                         # ParquetFileReader but not here.
                          props = ParquetArrowReaderProperties$create(),
                          ...) {
   if (!inherits(file, "RandomAccessFile")) {
+    # Compression is handled inside the parquet file format, so we don't need
+    # to detect from the file extension and wrap in a CompressedInputStream
     file <- make_readable_file(file)
     on.exit(file$close())
   }
@@ -156,6 +164,7 @@ write_parquet <- function(x,
   x <- as_writable_table(x)
 
   if (!inherits(sink, "OutputStream")) {
+    # TODO(ARROW-17221): if (missing(compression)), we could detect_compression(sink) here
     sink <- make_output_stream(sink)
     on.exit(sink$close())
   }
diff --git a/r/man/make_readable_file.Rd b/r/man/make_readable_file.Rd
index fe2e298261..1544815211 100644
--- a/r/man/make_readable_file.Rd
+++ b/r/man/make_readable_file.Rd
@@ -4,21 +4,12 @@
 \alias{make_readable_file}
 \title{Handle a range of possible input sources}
 \usage{
-make_readable_file(file, mmap = TRUE, compression = NULL, filesystem = NULL)
+make_readable_file(file, mmap = TRUE)
 }
 \arguments{
 \item{file}{A character file name, \code{raw} vector, or an Arrow input stream}
 
 \item{mmap}{Logical: whether to memory-map the file (default \code{TRUE})}
-
-\item{compression}{If the file is compressed, created a \link{CompressedInputStream}
-with this compression codec, either a \link{Codec} or the string name of one.
-If \code{NULL} (default) and \code{file} is a string file name, the function will try
-to infer compression from the file extension.}
-
-\item{filesystem}{If not \code{NULL}, \code{file} will be opened via the
-\code{filesystem$OpenInputFile()} filesystem method, rather than the \code{io} module's
-\code{MemoryMappedFile} or \code{ReadableFile} constructors.}
 }
 \value{
 An \code{InputStream} or a subclass of one.
diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd
index 07d20b8e01..218a163b99 100644
--- a/r/man/read_feather.Rd
+++ b/r/man/read_feather.Rd
@@ -5,9 +5,9 @@
 \alias{read_ipc_file}
 \title{Read a Feather file (an Arrow IPC file)}
 \usage{
-read_feather(file, col_select = NULL, as_data_frame = TRUE, ...)
+read_feather(file, col_select = NULL, as_data_frame = TRUE, mmap = TRUE)
 
-read_ipc_file(file, col_select = NULL, as_data_frame = TRUE, ...)
+read_ipc_file(file, col_select = NULL, as_data_frame = TRUE, mmap = TRUE)
 }
 \arguments{
 \item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream,
@@ -24,7 +24,7 @@ of columns, as used in \code{dplyr::select()}.}
 \item{as_data_frame}{Should the function return a \code{data.frame} (default) or
 an Arrow \link{Table}?}
 
-\item{...}{additional parameters, passed to \code{\link[=make_readable_file]{make_readable_file()}}.}
+\item{mmap}{Logical: whether to memory-map the file (default \code{TRUE})}
 }
 \value{
 A \code{data.frame} if \code{as_data_frame} is \code{TRUE} (the default), or an
diff --git a/r/man/read_ipc_stream.Rd b/r/man/read_ipc_stream.Rd
index 567ee9882b..63b50e7c1b 100644
--- a/r/man/read_ipc_stream.Rd
+++ b/r/man/read_ipc_stream.Rd
@@ -27,12 +27,6 @@ Apache Arrow defines two formats for \href{https://arrow.apache.org/docs/format/
 a "stream" format and a "file" format, known as Feather. \code{read_ipc_stream()}
 and \code{\link[=read_feather]{read_feather()}} read those formats, respectively.
 }
-\details{
-\code{read_arrow()}, a wrapper around \code{read_ipc_stream()} and \code{read_feather()},
-is deprecated. You should explicitly choose
-the function that will read the desired IPC format (stream or file) since
-a file or \code{InputStream} may contain either.
-}
 \seealso{
 \code{\link[=write_feather]{write_feather()}} for writing IPC files. \link{RecordBatchReader} for a
 lower-level interface.
diff --git a/r/man/write_feather.Rd b/r/man/write_feather.Rd
index 85c83ff04b..2d8a86f969 100644
--- a/r/man/write_feather.Rd
+++ b/r/man/write_feather.Rd
@@ -10,7 +10,7 @@ write_feather(
   sink,
   version = 2,
   chunk_size = 65536L,
-  compression = c("default", "lz4", "uncompressed", "zstd"),
+  compression = c("default", "lz4", "lz4_frame", "uncompressed", "zstd"),
   compression_level = NULL
 )
 
@@ -18,7 +18,7 @@ write_ipc_file(
   x,
   sink,
   chunk_size = 65536L,
-  compression = c("default", "lz4", "uncompressed", "zstd"),
+  compression = c("default", "lz4", "lz4_frame", "uncompressed", "zstd"),
   compression_level = NULL
 )
 }
@@ -37,8 +37,9 @@ random row access. Default is 64K. This option is not supported for V1.}
 \item{compression}{Name of compression codec to use, if any. Default is
 "lz4" if LZ4 is available in your build of the Arrow C++ library, otherwise
 "uncompressed". "zstd" is the other available codec and generally has better
-compression ratios in exchange for slower read and write performance
-See \code{\link[=codec_is_available]{codec_is_available()}}. This option is not supported for V1.}
+compression ratios in exchange for slower read and write performance.
+"lz4" is shorthand for the "lz4_frame" codec.
+See \code{\link[=codec_is_available]{codec_is_available()}} for details. This option is not supported for V1.}
 
 \item{compression_level}{If \code{compression} is "zstd", you may
 specify an integer compression level. If omitted, the compression codec's
diff --git a/r/man/write_ipc_stream.Rd b/r/man/write_ipc_stream.Rd
index 60c3197732..094e3ad11a 100644
--- a/r/man/write_ipc_stream.Rd
+++ b/r/man/write_ipc_stream.Rd
@@ -22,12 +22,6 @@ Apache Arrow defines two formats for \href{https://arrow.apache.org/docs/format/
 a "stream" format and a "file" format, known as Feather. \code{write_ipc_stream()}
 and \code{\link[=write_feather]{write_feather()}} write those formats, respectively.
 }
-\details{
-\code{write_arrow()}, a wrapper around \code{write_ipc_stream()} and \code{write_feather()}
-with some nonstandard behavior, is deprecated. You should explicitly choose
-the function that will write the desired IPC format (stream or file) since
-either can be written to a file or \code{OutputStream}.
-}
 \examples{
 tf <- tempfile()
 on.exit(unlink(tf))
diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R
index 485e16769f..7d1c1cfd39 100644
--- a/r/tests/testthat/test-compressed.R
+++ b/r/tests/testthat/test-compressed.R
@@ -40,6 +40,14 @@ test_that("Codec attributes", {
   expect_error(cod$level)
 })
 
+test_that("Default compression_level for zstd", {
+  skip_if_not_available("zstd")
+  cod <- Codec$create("zstd")
+  expect_equal(cod$name, "zstd")
+  # TODO: implement $level
+  expect_error(cod$level)
+})
+
 test_that("can write Buffer to CompressedOutputStream and read back in CompressedInputStream", {
   skip_if_not_available("gzip")
   buf <- buffer(as.raw(sample(0:255, size = 1024, replace = TRUE)))
diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R
index d4878e6d67..cd8da2625c 100644
--- a/r/tests/testthat/test-csv.R
+++ b/r/tests/testthat/test-csv.R
@@ -566,8 +566,6 @@ test_that("read/write compressed file successfully", {
   skip_if_not_available("gzip")
   tfgz <- tempfile(fileext = ".csv.gz")
   tf <- tempfile(fileext = ".csv")
-  on.exit(unlink(tf))
-  on.exit(unlink(tfgz))
 
   write_csv_arrow(tbl, tf)
   write_csv_arrow(tbl, tfgz)
@@ -577,6 +575,29 @@ test_that("read/write compressed file successfully", {
     read_csv_arrow(tfgz),
     tbl
   )
+  skip_if_not_available("lz4")
+  tflz4 <- tempfile(fileext = ".csv.lz4")
+  write_csv_arrow(tbl, tflz4)
+  expect_false(file.size(tfgz) == file.size(tflz4))
+  expect_identical(
+    read_csv_arrow(tflz4),
+    tbl
+  )
+})
+
+test_that("read/write compressed filesystem path", {
+  skip_if_not_available("zstd")
+  tfzst <- tempfile(fileext = ".csv.zst")
+  fs <- LocalFileSystem$create()$path(tfzst)
+  write_csv_arrow(tbl, fs)
+
+  tf <- tempfile(fileext = ".csv")
+  write_csv_arrow(tbl, tf)
+  expect_lt(file.size(tfzst), file.size(tf))
+  expect_identical(
+    read_csv_arrow(fs),
+    tbl
+  )
 })
 
 test_that("read_csv_arrow() can read sub-second timestamps with col_types T setting (ARROW-15599)", {
diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R
index 1ef2ecf3e9..8d7a43ad06 100644
--- a/r/tests/testthat/test-feather.R
+++ b/r/tests/testthat/test-feather.R
@@ -207,6 +207,22 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A
   )
 })
 
+test_that("write_feather() does not detect compression from filename", {
+  # TODO(ARROW-17221): should this be supported?
+  without <- tempfile(fileext = ".arrow")
+  with_zst <- tempfile(fileext = ".arrow.zst")
+  write_feather(mtcars, without)
+  write_feather(mtcars, with_zst)
+  expect_equal(file.size(without), file.size(with_zst))
+})
+
+test_that("read_feather() handles (ignores) compression in filename", {
+  df <- tibble::tibble(x = 1:5)
+  f <- tempfile(fileext = ".parquet.zst")
+  write_feather(df, f)
+  expect_equal(read_feather(f), df)
+})
+
 test_that("read_feather() and write_feather() accept connection objects", {
   skip_if_not(CanRunWithCapturedR())
 
diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R
index b75892bc84..32170534a4 100644
--- a/r/tests/testthat/test-parquet.R
+++ b/r/tests/testthat/test-parquet.R
@@ -185,6 +185,22 @@ test_that("write_parquet() defaults to snappy compression", {
   expect_equal(file.size(tmp1), file.size(tmp2))
 })
 
+test_that("write_parquet() does not detect compression from filename", {
+  # TODO(ARROW-17221): should this be supported?
+  without <- tempfile(fileext = ".parquet")
+  with_gz <- tempfile(fileext = ".parquet.gz")
+  write_parquet(mtcars, without)
+  write_parquet(mtcars, with_gz)
+  expect_equal(file.size(with_gz), file.size(without))
+})
+
+test_that("read_parquet() handles (ignores) compression in filename", {
+  df <- tibble::tibble(x = 1:5)
+  f <- tempfile(fileext = ".parquet.gz")
+  write_parquet(df, f)
+  expect_equal(read_parquet(f), df)
+})
+
 test_that("Factors are preserved when writing/reading from Parquet", {
   fct <- factor(c("a", "b"), levels = c("c", "a", "b"))
   ord <- factor(c("a", "b"), levels = c("c", "a", "b"), ordered = TRUE)