You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 03:21:20 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #12323: ARROW-9235: [R] Support for `connection` class when reading and writing files

westonpace commented on code in PR #12323:
URL: https://github.com/apache/arrow/pull/12323#discussion_r847893866


##########
r/R/io.R:
##########
@@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
   }
   if (is.string(file)) {
     if (is_url(file)) {
-      fs_and_path <- FileSystem$from_uri(file)
-      filesystem <- fs_and_path$fs
-      file <- fs_and_path$path
+      file <- tryCatch({
+        fs_and_path <- FileSystem$from_uri(file)
+        filesystem <- fs_and_path$fs
+        fs_and_path$path
+      }, error = function(e) {
+        MakeRConnectionInputStream(url(file, open = "rb"))

Review Comment:
   What does it look like if this call fails?  For example, if I'm a user and I mistakenly pass the wrong URI to `make_readable_file` will the error look reasonable?



##########
r/R/io.R:
##########
@@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem
   }
   if (is.string(file)) {
     if (is_url(file)) {
-      fs_and_path <- FileSystem$from_uri(file)
-      filesystem <- fs_and_path$fs
-      file <- fs_and_path$path
+      file <- tryCatch({
+        fs_and_path <- FileSystem$from_uri(file)
+        filesystem <- fs_and_path$fs
+        fs_and_path$path
+      }, error = function(e) {
+        MakeRConnectionInputStream(url(file, open = "rb"))
+      })
     }
+
     if (is.null(compression)) {
       # Infer compression from the file path
       compression <- detect_compression(file)
     }
+
     if (!is.null(filesystem)) {
       file <- filesystem$OpenInputFile(file)
-    } else if (isTRUE(mmap)) {
+    } else if (is.string(file) && isTRUE(mmap)) {
       file <- mmap_open(file)
-    } else {
+    } else if (is.string(file)) {
       file <- ReadableFile$create(file)
     }
+
     if (!identical(compression, "uncompressed")) {
       file <- CompressedInputStream$create(file, compression)
     }
   } else if (inherits(file, c("raw", "Buffer"))) {
     file <- BufferReader$create(file)
+  } else if (inherits(file, "connection")) {
+    if (!isOpen(file)) {
+      open(file, "rb")
+    }
+
+    # isSeekable() is not sufficient to check for seekability
+    # because we rely on seek(whence = "end") to get the size
+    # of the stream and a gzfile() is "seekable".

Review Comment:
   I assume this comment is related to the "seek to the end and back" behavior of RConnectionRandomAccessFile's constructor.  Should you move it there?  I'm not sure this comment makes sense where it is.



##########
r/src/io.cpp:
##########
@@ -207,7 +209,209 @@ void io___BufferOutputStream__Write(
   StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
 }
 
-// TransformInputStream::TransformFunc wrapper
+// ------ RConnectionInputStream / RConnectionOutputStream
+
+class RConnectionFileInterface : public virtual arrow::io::FileInterface {
+ public:
+  explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
+      : connection_sexp_(connection_sexp), closed_(false) {
+    check_closed();
+  }
+
+  arrow::Status Close() {
+    if (closed_) {
+      return arrow::Status::OK();
+    }
+
+    auto result = SafeCallIntoR<bool>([&]() {
+      cpp11::package("base")["close"](connection_sexp_);
+      return true;
+    });
+
+    RETURN_NOT_OK(result);
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> Tell() const {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_);
+    return cpp11::as_cpp<int64_t>(result);
+  }
+
+  bool closed() const { return closed_; }
+
+ protected:
+  cpp11::sexp connection_sexp_;
+
+  // Define the logic here because multiple inheritance makes it difficult
+  // for this base class, the InputStream and the RandomAccessFile
+  // interfaces to co-exist.
+  arrow::Result<int64_t> ReadBase(int64_t nbytes, void* out) {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    return SafeCallIntoR<int64_t>([&] {
+      cpp11::function read_bin = cpp11::package("base")["readBin"];
+      cpp11::writable::raws ptype((R_xlen_t)0);
+      cpp11::integers n = cpp11::as_sexp<int>(nbytes);
+
+      cpp11::sexp result = read_bin(connection_sexp_, ptype, n);
+
+      int64_t result_size = cpp11::safe[Rf_xlength](result);
+      memcpy(out, cpp11::safe[RAW](result), result_size);
+      return result_size;
+    });
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadBase(int64_t nbytes) {
+    arrow::BufferBuilder builder;
+    RETURN_NOT_OK(builder.Reserve(nbytes));
+
+    arrow::Result<int64_t> result;
+    RETURN_NOT_OK(result = ReadBase(nbytes, builder.mutable_data()));
+
+    builder.UnsafeAdvance(result.ValueOrDie());
+    return builder.Finish();
+  }
+
+  arrow::Status WriteBase(const void* data, int64_t nbytes) {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    auto result = SafeCallIntoR<bool>([&]() {

Review Comment:
   Might be nice if there was a `SafeCallIntoR<void>`.  It can be a bit of a pain because you can't create `Result<void>` but with some template specialization we could probably make a version that returns `Status`.



##########
r/src/csv.cpp:
##########
@@ -162,7 +164,19 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> csv___TableReader__Read(
     const std::shared_ptr<arrow::csv::TableReader>& table_reader) {
-  return ValueOrStop(table_reader->Read());
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] { fut.MarkFinished(table_reader->Read()); });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }

Review Comment:
   Luckily we have `table_reader->ReadAsync()` for just this purpose (I think)



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }
 
 // [[arrow::export]]
 std::shared_ptr<arrow::ipc::feather::Reader> ipc___feather___Reader__Open(
     const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
-  return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::ipc::feather::Reader>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::ipc::feather::Reader>>::Make();
+
+    thread_ptr = new std::thread(
+        [&] { fut.MarkFinished(arrow::ipc::feather::Reader::Open(stream)); });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   Same comment as the other feather method.



##########
r/src/io.cpp:
##########
@@ -207,7 +209,209 @@ void io___BufferOutputStream__Write(
   StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
 }
 
-// TransformInputStream::TransformFunc wrapper
+// ------ RConnectionInputStream / RConnectionOutputStream
+
+class RConnectionFileInterface : public virtual arrow::io::FileInterface {
+ public:
+  explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
+      : connection_sexp_(connection_sexp), closed_(false) {
+    check_closed();
+  }
+
+  arrow::Status Close() {
+    if (closed_) {
+      return arrow::Status::OK();
+    }
+
+    auto result = SafeCallIntoR<bool>([&]() {
+      cpp11::package("base")["close"](connection_sexp_);
+      return true;
+    });
+
+    RETURN_NOT_OK(result);
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> Tell() const {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_);
+    return cpp11::as_cpp<int64_t>(result);
+  }
+
+  bool closed() const { return closed_; }
+
+ protected:
+  cpp11::sexp connection_sexp_;
+
+  // Define the logic here because multiple inheritance makes it difficult
+  // for this base class, the InputStream and the RandomAccessFile
+  // interfaces to co-exist.
+  arrow::Result<int64_t> ReadBase(int64_t nbytes, void* out) {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    return SafeCallIntoR<int64_t>([&] {

Review Comment:
   In the category of "probably not worth the effort & complexity but pedantic note for the purists", it might be *slightly* nicer if you overloaded the async versions of the `arrow::io::FileInterface` methods and changed the sync versions to call the async verisons (instead of vice-versa).
   
   The only real gain is that you can avoid spinning up an I/O thread for no reason.  I/O threads are supposed to block on long operations so it isn't really a problem but it is a slight bit of extra overhead that isn't necessary.
   
   ![safe-call-into-r-async](https://user-images.githubusercontent.com/1696093/162872351-6e24e9bd-dbf2-4ed1-9d48-c06691d2d944.png)
   
   In fact, now that I draw this picture, I wonder if there might be some way to handle this by having an "R filesystem" whose I/O context's executor was the R main thread :thinking: .  Not something we need to tackle right now.



##########
r/src/io.cpp:
##########
@@ -207,7 +209,209 @@ void io___BufferOutputStream__Write(
   StopIfNotOk(stream->Write(RAW(bytes), bytes.size()));
 }
 
-// TransformInputStream::TransformFunc wrapper
+// ------ RConnectionInputStream / RConnectionOutputStream
+
+class RConnectionFileInterface : public virtual arrow::io::FileInterface {
+ public:
+  explicit RConnectionFileInterface(cpp11::sexp connection_sexp)
+      : connection_sexp_(connection_sexp), closed_(false) {
+    check_closed();
+  }
+
+  arrow::Status Close() {
+    if (closed_) {
+      return arrow::Status::OK();
+    }
+
+    auto result = SafeCallIntoR<bool>([&]() {
+      cpp11::package("base")["close"](connection_sexp_);
+      return true;
+    });
+
+    RETURN_NOT_OK(result);
+    closed_ = true;
+    return arrow::Status::OK();
+  }
+
+  arrow::Result<int64_t> Tell() const {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_);
+    return cpp11::as_cpp<int64_t>(result);
+  }
+
+  bool closed() const { return closed_; }
+
+ protected:
+  cpp11::sexp connection_sexp_;
+
+  // Define the logic here because multiple inheritance makes it difficult
+  // for this base class, the InputStream and the RandomAccessFile
+  // interfaces to co-exist.
+  arrow::Result<int64_t> ReadBase(int64_t nbytes, void* out) {
+    if (closed()) {
+      return arrow::Status::IOError("R connection is closed");
+    }
+
+    return SafeCallIntoR<int64_t>([&] {
+      cpp11::function read_bin = cpp11::package("base")["readBin"];
+      cpp11::writable::raws ptype((R_xlen_t)0);
+      cpp11::integers n = cpp11::as_sexp<int>(nbytes);
+
+      cpp11::sexp result = read_bin(connection_sexp_, ptype, n);
+
+      int64_t result_size = cpp11::safe[Rf_xlength](result);
+      memcpy(out, cpp11::safe[RAW](result), result_size);
+      return result_size;
+    });
+  }
+
+  arrow::Result<std::shared_ptr<arrow::Buffer>> ReadBase(int64_t nbytes) {
+    arrow::BufferBuilder builder;
+    RETURN_NOT_OK(builder.Reserve(nbytes));
+
+    arrow::Result<int64_t> result;
+    RETURN_NOT_OK(result = ReadBase(nbytes, builder.mutable_data()));
+
+    builder.UnsafeAdvance(result.ValueOrDie());
+    return builder.Finish();

Review Comment:
   ```suggestion
       ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadBase(nbytes, builder.mutable_data()));
       builder.UnsafeAdvance(bytes_read);
       return builder.Finish();
   ```
   
   You should never have to use `ValueOrDie` (except in a few cases like a constructor where it is impossible to return a status).



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   Unfortunately we do not have a `reader->ReadAsync` here.  There *is* `arrow::ipc::RecordBatchFileReader::ReadRecordBatchAsync` but it is private at the moment and also, not exposed to `arrow::feather` at all.
   
   So we could create a JIRA to expose that in C++.  In the meantime, the "standard" way to solve "the underlying file reader doesn't have a true asynchronous implementation" is to do:
   
   ```
   const auto& io_context = arrow::io::default_io_context();
   auto fut = io_context.executor()->Submit(...);
   ```
   
   That should work ok here.



##########
r/tests/testthat/test-feather.R:
##########
@@ -181,6 +181,17 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A
   )
 })
 
+test_that("read_feather() and write_feather() accept connection objects", {
+  tf <- tempfile()
+  on.exit(unlink(tf))
+  write_feather(tibble::tibble(x = 1:5), file(tf))

Review Comment:
   That should be handled but I agree it is an interesting test.  That would apply more for CSV than feather though.  I don't think we do any parallel column stuff with feather (decoding in feather is trivial).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org