You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/02 12:00:24 UTC

[GitHub] [arrow] fsaintjacques commented on a change in pull request #8305: ARROW-9782: [C++][Dataset] More configurable Dataset writing

fsaintjacques commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r498758123



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-struct WriteTask {
-  Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+  for (std::shared_ptr<RecordBatch> batch;;) {
+    RETURN_NOT_OK(batches->ReadNext(&batch));
+    if (batch == nullptr) break;
+    RETURN_NOT_OK(Write(batch));
+  }
+  return Status::OK();
+}
+
+struct NextBasenameGenerator {
+  static Result<NextBasenameGenerator> Make(const std::string& basename_template) {
+    if (basename_template.find(fs::internal::kSep) != std::string::npos) {
+      return Status::Invalid("basename_template contained '/'");
+    }
+    size_t token_start = basename_template.find(token());
+    if (token_start == std::string::npos) {
+      return Status::Invalid("basename_template did not contain '{i}'");
+    }
+    return NextBasenameGenerator{basename_template, 0, token_start,
+                                 token_start + token().size()};
+  }
 
-  /// The basename of files written by this WriteTask. Extensions
-  /// are derived from format
-  std::string basename;
+  static const std::string& token() {
+    static const std::string token = "{i}";
+    return token;
+  }
 
-  /// The partitioning with which paths will be generated
-  std::shared_ptr<Partitioning> partitioning;
+  const std::string& template_;
+  size_t i_, token_start_, token_end_;
 
-  /// The format in which fragments will be written
-  std::shared_ptr<FileFormat> format;
+  std::string operator()() {
+    return template_.substr(0, token_start_) + std::to_string(i_++) +
+           template_.substr(token_end_);
+  }
+};
 
-  /// The FileSystem and base directory into which fragments will be written
-  std::shared_ptr<fs::FileSystem> filesystem;
-  std::string base_dir;
+using MutexedWriter = util::Mutexed<std::shared_ptr<FileWriter>>;
 
-  /// Batches to be written
-  std::shared_ptr<RecordBatchReader> batches;
+struct WriterSet {
+  WriterSet(NextBasenameGenerator next_basename,
+            const FileSystemDatasetWriteOptions& write_options)
+      : next_basename_(std::move(next_basename)),
+        base_dir_(fs::internal::EnsureTrailingSlash(write_options.base_dir)),
+        write_options_(write_options) {}
 
-  /// An Expression already satisfied by every batch to be written
-  std::shared_ptr<Expression> partition_expression;
-};
+  Result<std::shared_ptr<MutexedWriter>> Get(const Expression& partition_expression,
+                                             const std::shared_ptr<Schema>& schema) {
+    ARROW_ASSIGN_OR_RAISE(auto part_segments,
+                          write_options_.partitioning->Format(partition_expression));
+    std::string dir = base_dir_ + part_segments;
 
-Status WriteTask::Execute() {
-  std::unordered_map<std::string, RecordBatchVector> path_to_batches;
-
-  // TODO(bkietz) these calls to Partition() should be scattered across a TaskGroup
-  for (auto maybe_batch : IteratorFromReader(batches)) {
-    ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-    ARROW_ASSIGN_OR_RAISE(auto partitioned_batches, partitioning->Partition(batch));
-    for (auto&& partitioned_batch : partitioned_batches) {
-      AndExpression expr(std::move(partitioned_batch.partition_expression),
-                         partition_expression);
-      ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
-      path = fs::internal::EnsureLeadingSlash(path);
-      path_to_batches[path].push_back(std::move(partitioned_batch.batch));
-    }
-  }
+    util::Mutex::Guard writer_lock;
+
+    auto set_lock = mutex_.Lock();
 
-  for (auto&& path_batches : path_to_batches) {
-    auto dir = base_dir + path_batches.first;
-    RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
+    auto writer =

Review comment:
       I don't understand the logic here about dual locking (set_lock, writer_lock).

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-struct WriteTask {
-  Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+  for (std::shared_ptr<RecordBatch> batch;;) {

Review comment:
       while-loop?

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-struct WriteTask {
-  Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+  for (std::shared_ptr<RecordBatch> batch;;) {
+    RETURN_NOT_OK(batches->ReadNext(&batch));
+    if (batch == nullptr) break;
+    RETURN_NOT_OK(Write(batch));
+  }
+  return Status::OK();
+}
+
+struct NextBasenameGenerator {
+  static Result<NextBasenameGenerator> Make(const std::string& basename_template) {
+    if (basename_template.find(fs::internal::kSep) != std::string::npos) {
+      return Status::Invalid("basename_template contained '/'");
+    }
+    size_t token_start = basename_template.find(token());
+    if (token_start == std::string::npos) {
+      return Status::Invalid("basename_template did not contain '{i}'");
+    }
+    return NextBasenameGenerator{basename_template, 0, token_start,
+                                 token_start + token().size()};
+  }
 
-  /// The basename of files written by this WriteTask. Extensions
-  /// are derived from format
-  std::string basename;
+  static const std::string& token() {
+    static const std::string token = "{i}";
+    return token;
+  }
 
-  /// The partitioning with which paths will be generated
-  std::shared_ptr<Partitioning> partitioning;
+  const std::string& template_;
+  size_t i_, token_start_, token_end_;
 
-  /// The format in which fragments will be written
-  std::shared_ptr<FileFormat> format;
+  std::string operator()() {
+    return template_.substr(0, token_start_) + std::to_string(i_++) +
+           template_.substr(token_end_);
+  }
+};
 
-  /// The FileSystem and base directory into which fragments will be written
-  std::shared_ptr<fs::FileSystem> filesystem;
-  std::string base_dir;
+using MutexedWriter = util::Mutexed<std::shared_ptr<FileWriter>>;
 
-  /// Batches to be written
-  std::shared_ptr<RecordBatchReader> batches;
+struct WriterSet {
+  WriterSet(NextBasenameGenerator next_basename,
+            const FileSystemDatasetWriteOptions& write_options)
+      : next_basename_(std::move(next_basename)),
+        base_dir_(fs::internal::EnsureTrailingSlash(write_options.base_dir)),
+        write_options_(write_options) {}
 
-  /// An Expression already satisfied by every batch to be written
-  std::shared_ptr<Expression> partition_expression;
-};
+  Result<std::shared_ptr<MutexedWriter>> Get(const Expression& partition_expression,
+                                             const std::shared_ptr<Schema>& schema) {
+    ARROW_ASSIGN_OR_RAISE(auto part_segments,
+                          write_options_.partitioning->Format(partition_expression));
+    std::string dir = base_dir_ + part_segments;
 
-Status WriteTask::Execute() {
-  std::unordered_map<std::string, RecordBatchVector> path_to_batches;
-
-  // TODO(bkietz) these calls to Partition() should be scattered across a TaskGroup
-  for (auto maybe_batch : IteratorFromReader(batches)) {
-    ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-    ARROW_ASSIGN_OR_RAISE(auto partitioned_batches, partitioning->Partition(batch));
-    for (auto&& partitioned_batch : partitioned_batches) {
-      AndExpression expr(std::move(partitioned_batch.partition_expression),
-                         partition_expression);
-      ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
-      path = fs::internal::EnsureLeadingSlash(path);
-      path_to_batches[path].push_back(std::move(partitioned_batch.batch));
-    }
-  }
+    util::Mutex::Guard writer_lock;
+
+    auto set_lock = mutex_.Lock();
 
-  for (auto&& path_batches : path_to_batches) {
-    auto dir = base_dir + path_batches.first;
-    RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
+    auto writer =
+        internal::GetOrInsertGenerated(&dir_to_writer_, dir, [&](const std::string&) {
+          auto writer = std::make_shared<MutexedWriter>();
+          writer_lock = writer->Lock();
+          return writer;
+        })->second;
 
-    auto path = fs::internal::ConcatAbstractPath(dir, basename);
-    ARROW_ASSIGN_OR_RAISE(auto destination, filesystem->OpenOutputStream(path));
+    if (writer_lock) {
+      // NB: next_basename_() must be invoked with the set_lock held
+      auto path = fs::internal::ConcatAbstractPath(dir, next_basename_());
+      set_lock.Unlock();
 
-    DCHECK(!path_batches.second.empty());
-    ARROW_ASSIGN_OR_RAISE(auto reader,
-                          RecordBatchReader::Make(std::move(path_batches.second)));
-    RETURN_NOT_OK(format->WriteFragment(reader.get(), destination.get()));
+      RETURN_NOT_OK(write_options_.filesystem->CreateDir(dir));
+
+      ARROW_ASSIGN_OR_RAISE(auto destination,
+                            write_options_.filesystem->OpenOutputStream(path));
+
+      ARROW_ASSIGN_OR_RAISE(**writer, write_options_.format()->MakeWriter(
+                                          std::move(destination), schema,
+                                          write_options_.file_write_options));
+    }
+
+    return writer;
   }
 
-  return Status::OK();
-}
+  Status FinishAll(internal::TaskGroup* task_group) {
+    for (const auto& dir_writer : dir_to_writer_) {
+      task_group->Append([&] {
+        std::shared_ptr<FileWriter> writer = **dir_writer.second;
+        return writer->Finish();
+      });
+    }
 
-Status FileSystemDataset::Write(std::shared_ptr<Schema> schema,
-                                std::shared_ptr<FileFormat> format,
-                                std::shared_ptr<fs::FileSystem> filesystem,
-                                std::string base_dir,
-                                std::shared_ptr<Partitioning> partitioning,
-                                std::shared_ptr<ScanContext> scan_context,
-                                FragmentIterator fragment_it) {
-  auto task_group = scan_context->TaskGroup();
+    return Status::OK();
+  }
+
+  // There should only be a single writer open for each partition directory at a time
+  util::Mutex mutex_;
+  std::unordered_map<std::string, std::shared_ptr<MutexedWriter>> dir_to_writer_;
+  NextBasenameGenerator next_basename_;
+  std::string base_dir_;
+  const FileSystemDatasetWriteOptions& write_options_;
+};
 
-  base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir));
+Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
+                                std::shared_ptr<Scanner> scanner) {
+  auto task_group = scanner->context()->TaskGroup();
 
-  for (const auto& f : partitioning->schema()->fields()) {
+  for (const auto& f : write_options.partitioning->schema()->fields()) {
     if (f->type()->id() == Type::DICTIONARY) {
       return Status::NotImplemented("writing with dictionary partitions");
     }
   }
 
-  int i = 0;
-  for (auto maybe_fragment : fragment_it) {
-    ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment);
-    auto task = std::make_shared<WriteTask>();
-
-    task->basename = "dat_" + std::to_string(i++) + "." + format->type_name();
-    task->partition_expression = fragment->partition_expression();
-    task->format = format;
-    task->filesystem = filesystem;
-    task->base_dir = base_dir;
-    task->partitioning = partitioning;
-
-    // make a record batch reader which yields from a fragment
-    ARROW_ASSIGN_OR_RAISE(task->batches, FragmentRecordBatchReader::Make(
-                                             std::move(fragment), schema, scan_context));
-    task_group->Append([task] { return task->Execute(); });
+  // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
+  //
+  // - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
+  //   any fragments we have before waiting for discovery to complete. This isn't
+  //   currently implemented for FileSystemDataset anyway: ARROW-8613
+  //
+  // - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when
+  //   writing Fragments which produce scan tasks slowly. No Fragments do this.
+  //
+  // NB: neither of these will have any impact whatsoever on the common case of writing
+  //     an in-memory table to disk.
+  ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, scanner->GetFragments().ToVector());
+  ScanTaskVector scan_tasks;
+  std::vector<const Fragment*> fragment_for_task;
+
+  // Avoid contention with multithreaded readers
+  auto context = std::make_shared<ScanContext>(*scanner->context());
+  context->use_threads = false;
+
+  for (const auto& fragment : fragments) {

Review comment:
       Since we know all fragments (and their expressions) already, can we avoid all the locking multi-threading in WriterSet (IIRC, you need them to create the writer once)? That would heavily simplify all of this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org