You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/04 18:37:44 UTC

[GitHub] [arrow] jorisvandenbossche commented on a change in pull request #7869: ARROW-8002: [C++][Dataset][R] Support partitioned dataset writing

jorisvandenbossche commented on a change in pull request #7869:
URL: https://github.com/apache/arrow/pull/7869#discussion_r465242408



##########
File path: cpp/src/arrow/dataset/filter.cc
##########
@@ -1493,5 +1495,191 @@ Result<std::shared_ptr<Expression>> Expression::Deserialize(const Buffer& serial
   return DeserializeImpl{}.FromBuffer(serialized);
 }
 
+// Transform an array of counts to offsets which will divide a ListArray
+// into an equal number of slices with corresponding lengths.
+inline Result<std::shared_ptr<Array>> CountsToOffsets(
+    std::shared_ptr<Int64Array> counts) {
+  Int32Builder offset_builder;
+  RETURN_NOT_OK(offset_builder.Resize(counts->length() + 1));
+  offset_builder.UnsafeAppend(0);
+
+  for (int64_t i = 0; i < counts->length(); ++i) {
+    DCHECK_NE(counts->Value(i), 0);
+    auto next_offset = static_cast<int32_t>(offset_builder[i] + counts->Value(i));
+    offset_builder.UnsafeAppend(next_offset);
+  }
+
+  std::shared_ptr<Array> offsets;
+  RETURN_NOT_OK(offset_builder.Finish(&offsets));
+  return offsets;
+}
+
+// Helper for simultaneous dictionary encoding of multiple arrays.
+//
+// The fused dictionary is the Cartesian product of the individual dictionaries.
+// For example given two arrays A, B where A has unique values ["ex", "why"]
+// and B has unique values [0, 1] the fused dictionary is the set of tuples
+// [["ex", 0], ["ex", 1], ["why", 0], ["ex", 1]].

Review comment:
       What's the behaviour for nulls?
   
   (edit: ah, I see that is checked and raises a NotImplemented error for now)

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -154,52 +157,101 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
-    const WritePlan& plan, std::shared_ptr<ScanOptions> scan_options,
-    std::shared_ptr<ScanContext> scan_context) {
-  auto filesystem = plan.filesystem;
-  if (filesystem == nullptr) {
-    filesystem = std::make_shared<fs::LocalFileSystem>();
-  }
+struct WriteTask {
+  Status Execute();
 
-  auto task_group = scan_context->TaskGroup();
-  auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir);
-  auto extension = "." + plan.format->type_name();
-
-  std::vector<std::shared_ptr<FileFragment>> fragments;
-  for (size_t i = 0; i < plan.paths.size(); ++i) {
-    const auto& op = plan.fragment_or_partition_expressions[i];
-    if (op.kind() == WritePlan::FragmentOrPartitionExpression::FRAGMENT) {
-      auto path = partition_base_dir + plan.paths[i] + extension;
-
-      const auto& input_fragment = op.fragment();
-      FileSource dest(path, filesystem);
-
-      ARROW_ASSIGN_OR_RAISE(auto write_task,
-                            plan.format->WriteFragment({path, filesystem}, input_fragment,
-                                                       scan_options, scan_context));
-      task_group->Append([write_task] { return write_task->Execute(); });
-
-      ARROW_ASSIGN_OR_RAISE(
-          auto fragment, plan.format->MakeFragment(
-                             {path, filesystem}, input_fragment->partition_expression()));
-      fragments.push_back(std::move(fragment));
+  std::string basename;

Review comment:
       doc comment for `basename` ?

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -154,52 +157,101 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-Result<std::shared_ptr<FileSystemDataset>> FileSystemDataset::Write(
-    const WritePlan& plan, std::shared_ptr<ScanOptions> scan_options,
-    std::shared_ptr<ScanContext> scan_context) {
-  auto filesystem = plan.filesystem;
-  if (filesystem == nullptr) {
-    filesystem = std::make_shared<fs::LocalFileSystem>();
-  }
+struct WriteTask {
+  Status Execute();
 
-  auto task_group = scan_context->TaskGroup();
-  auto partition_base_dir = fs::internal::EnsureTrailingSlash(plan.partition_base_dir);
-  auto extension = "." + plan.format->type_name();
-
-  std::vector<std::shared_ptr<FileFragment>> fragments;
-  for (size_t i = 0; i < plan.paths.size(); ++i) {
-    const auto& op = plan.fragment_or_partition_expressions[i];
-    if (op.kind() == WritePlan::FragmentOrPartitionExpression::FRAGMENT) {
-      auto path = partition_base_dir + plan.paths[i] + extension;
-
-      const auto& input_fragment = op.fragment();
-      FileSource dest(path, filesystem);
-
-      ARROW_ASSIGN_OR_RAISE(auto write_task,
-                            plan.format->WriteFragment({path, filesystem}, input_fragment,
-                                                       scan_options, scan_context));
-      task_group->Append([write_task] { return write_task->Execute(); });
-
-      ARROW_ASSIGN_OR_RAISE(
-          auto fragment, plan.format->MakeFragment(
-                             {path, filesystem}, input_fragment->partition_expression()));
-      fragments.push_back(std::move(fragment));
+  std::string basename;
+
+  /// The partitioning with which paths will be generated
+  std::shared_ptr<Partitioning> partitioning;
+
+  /// The format in which fragments will be written
+  std::shared_ptr<FileFormat> format;
+
+  /// The FileSystem and base directory into which fragments will be written
+  std::shared_ptr<fs::FileSystem> filesystem;
+  std::string base_dir;
+
+  /// Batches to be written
+  std::shared_ptr<RecordBatchReader> batches;
+
+  /// An Expression already satisfied by every batch to be written
+  std::shared_ptr<Expression> partition_expression;

Review comment:
       What is this used for?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org