You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/11 18:09:08 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

westonpace commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630379347



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {

Review comment:
       Maybe take in `IOContext` instead of `MemoryPool*`?  If you later decide to add support for cancellation it'll save you from having to change the API.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }

Review comment:
       No need to close `owned_sink_`?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);

Review comment:
       Seems a little odd to have two options to control `batch_size`.  I suppose it's a "default" batch size and a "specific for this table" batch size?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -355,7 +370,9 @@ class CSVConverter {
     return header_length + (kQuoteDelimiterCount * schema_->num_fields());
   }
 
-  Status WriteHeader(io::OutputStream* out) {
+  Status WriteHeader() {
+    if (header_written_) return Status::OK();

Review comment:
       Would it be clearer to return `Invalid` here to inform the caller they are doing something odd?  Or is it sometimes hard for the caller to know when the header will be written?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {

Review comment:
       Why isn't it named `CSVWriter`?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
  private:
-  CSVConverter(std::shared_ptr<Schema> schema,
-               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
-      : column_populators_(std::move(populators)),
+  CSVConverter(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+               std::shared_ptr<Schema> schema,
+               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool,
+               const WriteOptions& options)
+      : sink_(sink),
+        owned_sink_(std::move(owned_sink)),
+        column_populators_(std::move(populators)),
         offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
         schema_(std::move(schema)),
-        pool_(pool) {}
+        pool_(pool),
+        options_(options) {}
 
-  Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+  Status PrepareForContentsWrite() {

Review comment:
       Does `data_buffer_` ever revert back to `nullptr`?  Why isn't it just initialized once at construction?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +420,41 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;
   std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
   std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
   std::shared_ptr<ResizableBuffer> data_buffer_;
   const std::shared_ptr<Schema> schema_;
   MemoryPool* pool_;
+  WriteOptions options_;
+  ipc::WriteStats stats_;
+  bool header_written_ = false;
 };
 
 }  // namespace
 
 Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool,
                 arrow::io::OutputStream* output) {
-  if (pool == nullptr) {
-    pool = default_memory_pool();
-  }
-  ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
-                  CSVConverter::Make(table.schema(), pool));
-  return converter->WriteCSV(table, options, output);
+  ASSIGN_OR_RAISE(auto converter,
+                  CSVConverter::Make(output, nullptr, table.schema(), pool, options));
+  RETURN_NOT_OK(converter->WriteTable(table, /*max_chunksize=*/-1));

Review comment:
       Given that `options` has a `batch_size` it feels odd here that we have to pass in `-1`.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       It seems like other places we have the "maybe owned stream" pattern (e.g. arrow::ipc::InputStreamMessageReader) we use overloads to make it a bit clearer that it is one or the other.  It might be good to do that here.

##########
File path: cpp/src/arrow/csv/writer_test.cc
##########
@@ -91,6 +92,26 @@ class TestWriteCSV : public ::testing::TestWithParam<WriterTestParams> {
     ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer, out->Finish());
     return std::string(reinterpret_cast<const char*>(buffer->data()), buffer->size());
   }
+
+  Result<std::string> ToCsvStringWriter(const Table& data, const WriteOptions& options) {

Review comment:
       This method name is confusing to me.  Maybe just `ToCsvStringViaWriter`?  I guess it makes a bit more sense when I look at the test.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -256,5 +267,46 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
   return GeneratorFromReader(std::move(reader_fut));
 }
 
+//
+// CsvFileWriter, CsvFileWriteOptions
+//
+
+std::shared_ptr<FileWriteOptions> CsvFileFormat::DefaultWriteOptions() {
+  std::shared_ptr<CsvFileWriteOptions> csv_options(
+      new CsvFileWriteOptions(shared_from_this()));
+  csv_options->options =
+      std::make_shared<csv::WriteOptions>(csv::WriteOptions::Defaults());
+  csv_options->pool = default_memory_pool();

Review comment:
       I'm a little surprised that `pool` is not a property of `FileWriteOptions`.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -256,5 +267,46 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
   return GeneratorFromReader(std::move(reader_fut));
 }
 
+//
+// CsvFileWriter, CsvFileWriteOptions
+//
+
+std::shared_ptr<FileWriteOptions> CsvFileFormat::DefaultWriteOptions() {
+  std::shared_ptr<CsvFileWriteOptions> csv_options(

Review comment:
       I suppose you are not using `make_shared` here because you are `using` the constructor of the base type?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3107,6 +3107,33 @@ def test_write_dataset_parquet(tempdir):
         assert meta.format_version == version
 
 
+def test_write_dataset_csv(tempdir):
+    table = pa.table([
+        pa.array(range(20)), pa.array(np.random.randn(20)),
+        pa.array(np.repeat(['a', 'b'], 10))
+    ], names=["f1", "f2", "part"])

Review comment:
       The column here is named `part` which makes me think it is going to be used for partitioning but that isn't actually done.  I'm not sure this is a problem as much as an observation.

##########
File path: python/pyarrow/_csv.pxd
##########
@@ -44,3 +44,11 @@ cdef class ReadOptions(_Weakrefable):
 
     @staticmethod
     cdef ReadOptions wrap(CCSVReadOptions options)
+
+
+cdef class WriteOptions(_Weakrefable):
+    cdef:
+        unique_ptr[CCSVWriteOptions] options

Review comment:
       Why does this need to be a `unique_ptr`?  `CCSVWriteOptions` is pretty trivial.

##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -83,6 +82,37 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
   csv::ReadOptions read_options = csv::ReadOptions::Defaults();
 };
 
+class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
+ public:
+  /// Options passed to csv::MakeCSVWriter. use_threads is ignored
+  std::shared_ptr<csv::WriteOptions> options;

Review comment:
       `options` is a little ambiguous.  Perhaps `format_options` or `csv_options` or `writer_options`?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1747,8 +1749,15 @@ cdef class CsvFileFormat(FileFormat):
         FileFormat.init(self, sp)
         self.csv_format = <CCsvFileFormat*> sp.get()
 
-    def make_write_options(self):
-        raise NotImplemented("writing CSV datasets")
+    def make_write_options(self, WriteOptions options=None,

Review comment:
       This is kind of confusing having a method named `make_write_options` that takes in an instance of `WriteOptions`.  Perhaps in C++ it wouldn't be so bad but for Python I think we might want something more understandable.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       Although I suppose it is hidden behind `MakeCSVWriter`.  Which I suppose begs the question "Why have the non-owned option at all?"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org