You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 17:38:38 UTC

[GitHub] [arrow] paleolimbot commented on a diff in pull request #12323: ARROW-9235: [R] Support for `connection` class when reading and writing files

paleolimbot commented on code in PR #12323:
URL: https://github.com/apache/arrow/pull/12323#discussion_r847573544


##########
r/src/csv.cpp:
##########
@@ -162,7 +164,19 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> csv___TableReader__Read(
     const std::shared_ptr<arrow::csv::TableReader>& table_reader) {
-  return ValueOrStop(table_reader->Read());
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] { fut.MarkFinished(table_reader->Read()); });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to wrap `table_reader->Read()` with `RunWithCapturedR()`, but we need a cleaner way to do it than what I have here!



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;
+
+  return ValueOrStop(result);
 }
 
 // [[arrow::export]]
 std::shared_ptr<arrow::ipc::feather::Reader> ipc___feather___Reader__Open(
     const std::shared_ptr<arrow::io::RandomAccessFile>& stream) {
-  return ValueOrStop(arrow::ipc::feather::Reader::Open(stream));
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::ipc::feather::Reader>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::ipc::feather::Reader>>::Make();
+
+    thread_ptr = new std::thread(
+        [&] { fut.MarkFinished(arrow::ipc::feather::Reader::Open(stream)); });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to wrap `arrow::ipc::feather::Reader::Open(stream))` with `RunWithCapturedR()`, but we need a cleaner way to do it than what I have here!



##########
r/src/feather.cpp:
##########
@@ -48,34 +51,63 @@ int ipc___feather___Reader__version(
 
 // [[arrow::export]]
 std::shared_ptr<arrow::Table> ipc___feather___Reader__Read(
-    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, SEXP columns) {
-  std::shared_ptr<arrow::Table> table;
-
-  switch (TYPEOF(columns)) {
-    case STRSXP: {
-      R_xlen_t n = XLENGTH(columns);
-      std::vector<std::string> names(n);
-      for (R_xlen_t i = 0; i < n; i++) {
-        names[i] = CHAR(STRING_ELT(columns, i));
-      }
-      StopIfNotOk(reader->Read(names, &table));
-      break;
+    const std::shared_ptr<arrow::ipc::feather::Reader>& reader, cpp11::sexp columns) {
+  bool use_names = columns != R_NilValue;
+  std::vector<std::string> names;
+  if (use_names) {
+    cpp11::strings columns_chr(columns);
+    names.reserve(columns_chr.size());
+    for (const auto& name : columns_chr) {
+      names.push_back(name);
     }
-    case NILSXP:
-      StopIfNotOk(reader->Read(&table));
-      break;
-    default:
-      cpp11::stop("incompatible column specification");
-      break;
   }
 
-  return table;
+  std::thread* thread_ptr;
+  auto result = RunWithCapturedR<std::shared_ptr<arrow::Table>>([&]() {
+    auto fut = arrow::Future<std::shared_ptr<arrow::Table>>::Make();
+
+    thread_ptr = new std::thread([&] {
+      std::shared_ptr<arrow::Table> table;
+      arrow::Status read_result;
+      if (use_names) {
+        read_result = reader->Read(names, &table);
+      } else {
+        read_result = reader->Read(&table);
+      }
+
+      if (read_result.ok()) {
+        fut.MarkFinished(table);
+      } else {
+        fut.MarkFinished(read_result);
+      }
+    });
+
+    return fut;
+  });
+
+  thread_ptr->join();
+  delete thread_ptr;

Review Comment:
   In order for the connection thing to work for `read_csv_arrow()`, we need to wrap `reader->Read()` with `RunWithCapturedR()`, but we need a cleaner way to do it than what I have here!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org