You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/02 20:57:33 UTC

[GitHub] [arrow] nealrichardson commented on a change in pull request #8101: ARROW-9868: [C++][R] Provide CopyFiles for copying files between FileSystems

nealrichardson commented on a change in pull request #8101:
URL: https://github.com/apache/arrow/pull/8101#discussion_r482457407



##########
File path: r/R/filesystem.R
##########
@@ -280,6 +280,21 @@ SubTreeFileSystem$create <- function(base_path, base_fs) {
   shared_ptr(SubTreeFileSystem, xp)
 }
 
+#' Copy files between FileSystems
+#'
+#' @param src_fs The FileSystem from which files will be copied.
+#' @param src_paths The paths of files to be copied.
+#' @param dest_fs The FileSystem into which files will be copied.
+#' @param dest_paths Where the copied files should be placed.
+#' @param chunk_size The maximum size of block to read before flushing
+#' to the destination file. A larger chunk_size will use more memory while
+#' copying but may help accommodate high latency FileSystems.
+#' @param use_threads Whether to copy files in parallel.

Review comment:
       The R package is set up to use a global use-threads setting rather than exposing that in every function signature. So instead of exposing `use_threads` as a function argument, call `options_use_threads()` in L295. See `as.data.frame.RecordBatch` for an example.

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -439,6 +440,28 @@ Result<std::shared_ptr<io::OutputStream>> SlowFileSystem::OpenAppendStream(
   return base_fs_->OpenAppendStream(path);
 }
 
+Status CopyFiles(const std::shared_ptr<FileSystem>& src_fs,
+                 const std::vector<std::string>& src_paths,
+                 const std::shared_ptr<FileSystem>& dest_fs,
+                 const std::vector<std::string>& dest_paths, int64_t chunk_size,
+                 bool use_threads) {
+  if (src_paths.size() != dest_paths.size()) {
+    return Status::Invalid("Trying to copy ", src_paths.size(), " files into ",
+                           dest_paths.size(), " paths.");
+  }
+
+  return ::arrow::internal::OptionalParallelFor(
+      use_threads, static_cast<int>(src_paths.size()), [&](int i) {
+        ARROW_ASSIGN_OR_RAISE(auto src, src_fs->OpenInputStream(src_paths[i]));
+        auto dest_dir = internal::GetAbstractPathParent(dest_paths[i]).first;
+        if (!dest_dir.empty()) {
+          RETURN_NOT_OK(dest_fs->CreateDir(dest_dir));
+        }
+        ARROW_ASSIGN_OR_RAISE(auto dest, dest_fs->OpenOutputStream(dest_paths[i]));
+        return internal::CopyStream(src, dest, chunk_size);

Review comment:
       Should CopyStream also be parallelized? Looking at its implementation, I wonder whether we could benefit from having separate threads do the Read and the Write




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org