You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/03 19:01:18 UTC

[GitHub] [arrow] lidavidm opened a new pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

lidavidm opened a new pull request #10230:
URL: https://github.com/apache/arrow/pull/10230


   This refactors the CSV write support to expose an explicit CSV writer class, and adds Python bindings and Datasets support.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631167761



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1819,6 +1824,28 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions):
                                         self.read_options)
 
 
+cdef class CsvFileWriteOptions(FileWriteOptions):

Review comment:
       same nit on Csv vs CSV 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663909304



##########
File path: python/pyarrow/_csv.pyx
##########
@@ -1010,15 +1015,44 @@ def write_csv(data, output_file, write_options=None,
 
     get_writer(output_file, &stream)
     c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+    c_write_options.io_context = CIOContext(c_memory_pool)
     if isinstance(data, RecordBatch):
         batch = pyarrow_unwrap_batch(data).get()
         with nogil:
-            check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
     elif isinstance(data, Table):
         table = pyarrow_unwrap_table(data).get()
         with nogil:
-            check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
     else:
         raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
+
+
+cdef class CSVWriter(_CRecordBatchWriter):
+    """Writer to create a CSV file.
+
+    Parameters
+    ----------
+    sink: string, path, pyarrow.OutputStream or file-like object
+        The location where to write the CSV data.
+    schema: pyarrow.Schema
+        The schema of the data to be written.
+    write_options: pyarrow.csv.WriteOptions
+        Options to configure writing the CSV data.
+    memory_pool: MemoryPool, optional
+        Pool for temporary allocations.
+    """
+
+    def __init__(self, sink, Schema schema, *,
+                 WriteOptions write_options=None, MemoryPool memory_pool=None):
+        cdef:
+            shared_ptr[COutputStream] c_stream
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            CCSVWriteOptions c_write_options
+            CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+        _get_write_options(write_options, &c_write_options)
+        c_write_options.io_context = CIOContext(c_memory_pool)

Review comment:
       I would say yes, since it wraps up a memory pool, thread pool, and cancellation token all in one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630423088



##########
File path: python/pyarrow/_csv.pxd
##########
@@ -44,3 +44,11 @@ cdef class ReadOptions(_Weakrefable):
 
     @staticmethod
     cdef ReadOptions wrap(CCSVReadOptions options)
+
+
+cdef class WriteOptions(_Weakrefable):
+    cdef:
+        unique_ptr[CCSVWriteOptions] options

Review comment:
       Mostly for consistency with the other options, and in case we add things to WriteOptions that would make it a non-standard layout type, in which case Cython will generate a lot of compiler warnings as it relies on sizeof.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-874149785


   Thanks for the updates @lidavidm !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630422293



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       The WriteCSV helpers in writer.h actually use the non-owned version - for completeness I should introduce an overload of MakeCSVWriter that exposes that as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10230:
URL: https://github.com/apache/arrow/pull/10230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nealrichardson commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
nealrichardson commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-832225706


   There's probably a very small amount of wiring to propagate this up to the R `write_dataset()` function; up to you if you want to handle it here or make another JIRA for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-832283894


   I threw in R support and found & fixed a bug with scanning CSV datasets with manually-specified names.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663309666



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,76 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVWriterImpl : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVWriterImpl>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, const WriteOptions& options) {
+    RETURN_NOT_OK(options.Validate());
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
-      ASSIGN_OR_RAISE(populators[col],
-                      MakePopulator(*schema->field(col), end_char, pool));
+      ASSIGN_OR_RAISE(populators[col], MakePopulator(*schema->field(col), end_char,
+                                                     options.io_context.pool()));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVWriterImpl>(new CSVWriterImpl(

Review comment:
       nit: std::make_shared? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631166917



##########
File path: python/pyarrow/_csv.pyx
##########
@@ -946,15 +951,44 @@ def write_csv(data, output_file, write_options=None,
 
     get_writer(output_file, &stream)
     c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+    c_write_options.io_context = CIOContext(c_memory_pool)
     if isinstance(data, RecordBatch):
         batch = pyarrow_unwrap_batch(data).get()
         with nogil:
-            check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
     elif isinstance(data, Table):
         table = pyarrow_unwrap_table(data).get()
         with nogil:
-            check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
     else:
         raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
+
+
+cdef class CsvWriter(_CRecordBatchWriter):

Review comment:
       nit: as much as I appreciate Csv naming convention I think CSV is used everywhere else?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-831469241


   https://issues.apache.org/jira/browse/ARROW-12512


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-874149785


   Thanks for the updates @lidavidm !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630431388



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }

Review comment:
       The IPC reader doesn't do this either, oddly. I guess it is not a Rust 'exclusively owned' sink but merely, 'keep this sink alive'. (Though, that does beg the question: what's the point? Either you're the only one keeping it alive, and so you should close it, or you aren't the only one, and you don't need a shared_ptr. I would guess it's just less of a footgun to have a strong reference than a potentially dangling one, though.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630423781



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1747,8 +1749,15 @@ cdef class CsvFileFormat(FileFormat):
         FileFormat.init(self, sp)
         self.csv_format = <CCsvFileFormat*> sp.get()
 
-    def make_write_options(self):
-        raise NotImplemented("writing CSV datasets")
+    def make_write_options(self, WriteOptions options=None,

Review comment:
       I could have it take **kwargs which get forwarded to csv.WriteOptions; now that I look, that's what ParquetFileFormat does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631239611



##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1819,6 +1824,28 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions):
                                         self.read_options)
 
 
+cdef class CsvFileWriteOptions(FileWriteOptions):

Review comment:
       Unfortunately in the context of datasets (and only datasets) all other classes already use Csv.

##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -83,6 +82,35 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
   csv::ReadOptions read_options = csv::ReadOptions::Defaults();
 };
 
+class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
+ public:
+  /// Options passed to csv::MakeCSVWriter. use_threads is ignored

Review comment:
       I copied this from the equivalent IPC struct - it doesn't apply here since there's no such parameter of course.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +415,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;

Review comment:
       I agree it seems weird, but both the IPC and Parquet writers use shared_ptr for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663909304



##########
File path: python/pyarrow/_csv.pyx
##########
@@ -1010,15 +1015,44 @@ def write_csv(data, output_file, write_options=None,
 
     get_writer(output_file, &stream)
     c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+    c_write_options.io_context = CIOContext(c_memory_pool)
     if isinstance(data, RecordBatch):
         batch = pyarrow_unwrap_batch(data).get()
         with nogil:
-            check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
     elif isinstance(data, Table):
         table = pyarrow_unwrap_table(data).get()
         with nogil:
-            check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
     else:
         raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
+
+
+cdef class CSVWriter(_CRecordBatchWriter):
+    """Writer to create a CSV file.
+
+    Parameters
+    ----------
+    sink: string, path, pyarrow.OutputStream or file-like object
+        The location where to write the CSV data.
+    schema: pyarrow.Schema
+        The schema of the data to be written.
+    write_options: pyarrow.csv.WriteOptions
+        Options to configure writing the CSV data.
+    memory_pool: MemoryPool, optional
+        Pool for temporary allocations.
+    """
+
+    def __init__(self, sink, Schema schema, *,
+                 WriteOptions write_options=None, MemoryPool memory_pool=None):
+        cdef:
+            shared_ptr[COutputStream] c_stream
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            CCSVWriteOptions c_write_options
+            CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+        _get_write_options(write_options, &c_write_options)
+        c_write_options.io_context = CIOContext(c_memory_pool)

Review comment:
       I would say yes, since it wraps up a memory pool, thread pool, and cancellation token all in one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630466154



##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -256,5 +267,46 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
   return GeneratorFromReader(std::move(reader_fut));
 }
 
+//
+// CsvFileWriter, CsvFileWriteOptions
+//
+
+std::shared_ptr<FileWriteOptions> CsvFileFormat::DefaultWriteOptions() {
+  std::shared_ptr<CsvFileWriteOptions> csv_options(

Review comment:
       Ah, it's because it's a protected constructor, and make_shared only works with a public constructor.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r662452658



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }

Review comment:
       Right, we don't close either in other "writer" classes. This is more flexible, though of course in the general case not very useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-873337204


   A few random nits, but the core C++ looks OK to me.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-839886411


   Took a quick pass through, seems OK to me.  (didn't look at R stuff at all) and I agree with Weston's comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631165033



##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -83,6 +82,35 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
   csv::ReadOptions read_options = csv::ReadOptions::Defaults();
 };
 
+class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
+ public:
+  /// Options passed to csv::MakeCSVWriter. use_threads is ignored

Review comment:
       Is use_threads used elsewhere?  The way the code is structured threads could be used for the casts, so if it is important we might want to file a follow-up JIRA.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631172361



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
  private:
-  CSVConverter(std::shared_ptr<Schema> schema,
-               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
-      : column_populators_(std::move(populators)),
+  CSVConverter(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+               std::shared_ptr<Schema> schema,
+               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool,
+               const WriteOptions& options)
+      : sink_(sink),
+        owned_sink_(std::move(owned_sink)),
+        column_populators_(std::move(populators)),
         offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
         schema_(std::move(schema)),
-        pool_(pool) {}
+        pool_(pool),
+        options_(options) {}
 
-  Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+  Status PrepareForContentsWrite() {

Review comment:
       I think at one point I might have been using as a signal to see if header was written, but I can't really remember.  I agree it is strange and I don't have a strong justification for this pattern.  It might of been to avoid having to make a factory function for the private class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-839952344


   Thanks for the reviews. I think I've addressed all feedback, minus the shared_ptr<OutputStream> - while this is weird, it is the pattern used by IPC and Parquet as well and I think we may was well be consistent across the formats. (Also, IPC exposes both the output-owning and output-borrowing APIs too, even though it expects the caller to close the stream in both cases.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-832156102


   @jorisvandenbossche I missed that: CsvFileFormat.make_write_options in Python needs to be updated as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r662454305



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,36 +416,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;
   std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
   std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
   std::shared_ptr<ResizableBuffer> data_buffer_;
   const std::shared_ptr<Schema> schema_;
-  MemoryPool* pool_;
+  WriteOptions options_;

Review comment:
       Nit: `const`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630421629



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);

Review comment:
       There's a bit of an impedance mismatch because I elected to reuse the ipc::RecordBatchWriter interface, which has parameters like that in the API. I could at least introduce an overload that doesn't require specifying it for convenience.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r662454081



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +415,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;

Review comment:
       Well, except that in Python any object can be shared, even if it's logically "owned" by something.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-871418456


   Rebased and fixed conflicts here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r630379347



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {

Review comment:
       Maybe take in `IOContext` instead of `MemoryPool*`?  If you later decide to add support for cancellation it'll save you from having to change the API.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }

Review comment:
       No need to close `owned_sink_`?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);

Review comment:
       Seems a little odd to have two options to control `batch_size`.  I suppose it's a "default" batch size and a "specific for this table" batch size?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -355,7 +370,9 @@ class CSVConverter {
     return header_length + (kQuoteDelimiterCount * schema_->num_fields());
   }
 
-  Status WriteHeader(io::OutputStream* out) {
+  Status WriteHeader() {
+    if (header_written_) return Status::OK();

Review comment:
       Would it be clearer to return `Invalid` here to inform the caller they are doing something odd?  Or is it sometimes hard for the caller to know when the header will be written?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {

Review comment:
       Why isn't it named `CSVWriter`?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
  private:
-  CSVConverter(std::shared_ptr<Schema> schema,
-               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
-      : column_populators_(std::move(populators)),
+  CSVConverter(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+               std::shared_ptr<Schema> schema,
+               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool,
+               const WriteOptions& options)
+      : sink_(sink),
+        owned_sink_(std::move(owned_sink)),
+        column_populators_(std::move(populators)),
         offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
         schema_(std::move(schema)),
-        pool_(pool) {}
+        pool_(pool),
+        options_(options) {}
 
-  Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+  Status PrepareForContentsWrite() {

Review comment:
       Does `data_buffer_` ever revert back to `nullptr`?  Why isn't it just initialized once at construction?

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +420,41 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;
   std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
   std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
   std::shared_ptr<ResizableBuffer> data_buffer_;
   const std::shared_ptr<Schema> schema_;
   MemoryPool* pool_;
+  WriteOptions options_;
+  ipc::WriteStats stats_;
+  bool header_written_ = false;
 };
 
 }  // namespace
 
 Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool,
                 arrow::io::OutputStream* output) {
-  if (pool == nullptr) {
-    pool = default_memory_pool();
-  }
-  ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
-                  CSVConverter::Make(table.schema(), pool));
-  return converter->WriteCSV(table, options, output);
+  ASSIGN_OR_RAISE(auto converter,
+                  CSVConverter::Make(output, nullptr, table.schema(), pool, options));
+  RETURN_NOT_OK(converter->WriteTable(table, /*max_chunksize=*/-1));

Review comment:
       Given that `options` has a `batch_size` it feels odd here that we have to pass in `-1`.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       It seems like other places we have the "maybe owned stream" pattern (e.g. arrow::ipc::InputStreamMessageReader) we use overloads to make it a bit clearer that it is one or the other.  It might be good to do that here.

##########
File path: cpp/src/arrow/csv/writer_test.cc
##########
@@ -91,6 +92,26 @@ class TestWriteCSV : public ::testing::TestWithParam<WriterTestParams> {
     ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer, out->Finish());
     return std::string(reinterpret_cast<const char*>(buffer->data()), buffer->size());
   }
+
+  Result<std::string> ToCsvStringWriter(const Table& data, const WriteOptions& options) {

Review comment:
       This method name is confusing to me.  Maybe just `ToCsvStringViaWriter`?  I guess it makes a bit more sense when I look at the test.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -256,5 +267,46 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
   return GeneratorFromReader(std::move(reader_fut));
 }
 
+//
+// CsvFileWriter, CsvFileWriteOptions
+//
+
+std::shared_ptr<FileWriteOptions> CsvFileFormat::DefaultWriteOptions() {
+  std::shared_ptr<CsvFileWriteOptions> csv_options(
+      new CsvFileWriteOptions(shared_from_this()));
+  csv_options->options =
+      std::make_shared<csv::WriteOptions>(csv::WriteOptions::Defaults());
+  csv_options->pool = default_memory_pool();

Review comment:
       I'm a little surprised that `pool` is not a property of `FileWriteOptions`.

##########
File path: cpp/src/arrow/dataset/file_csv.cc
##########
@@ -256,5 +267,46 @@ Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
   return GeneratorFromReader(std::move(reader_fut));
 }
 
+//
+// CsvFileWriter, CsvFileWriteOptions
+//
+
+std::shared_ptr<FileWriteOptions> CsvFileFormat::DefaultWriteOptions() {
+  std::shared_ptr<CsvFileWriteOptions> csv_options(

Review comment:
       I suppose you are not using `make_shared` here because you are `using` the constructor of the base type?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3107,6 +3107,33 @@ def test_write_dataset_parquet(tempdir):
         assert meta.format_version == version
 
 
+def test_write_dataset_csv(tempdir):
+    table = pa.table([
+        pa.array(range(20)), pa.array(np.random.randn(20)),
+        pa.array(np.repeat(['a', 'b'], 10))
+    ], names=["f1", "f2", "part"])

Review comment:
       The column here is named `part` which makes me think it is going to be used for partitioning but that isn't actually done.  I'm not sure this is a problem as much as an observation.

##########
File path: python/pyarrow/_csv.pxd
##########
@@ -44,3 +44,11 @@ cdef class ReadOptions(_Weakrefable):
 
     @staticmethod
     cdef ReadOptions wrap(CCSVReadOptions options)
+
+
+cdef class WriteOptions(_Weakrefable):
+    cdef:
+        unique_ptr[CCSVWriteOptions] options

Review comment:
       Why does this need to be a `unique_ptr`?  `CCSVWriteOptions` is pretty trivial.

##########
File path: cpp/src/arrow/dataset/file_csv.h
##########
@@ -83,6 +82,37 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
   csv::ReadOptions read_options = csv::ReadOptions::Defaults();
 };
 
+class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
+ public:
+  /// Options passed to csv::MakeCSVWriter. use_threads is ignored
+  std::shared_ptr<csv::WriteOptions> options;

Review comment:
       `options` is a little ambiguous.  Perhaps `format_options` or `csv_options` or `writer_options`?

##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1747,8 +1749,15 @@ cdef class CsvFileFormat(FileFormat):
         FileFormat.init(self, sp)
         self.csv_format = <CCsvFileFormat*> sp.get()
 
-    def make_write_options(self):
-        raise NotImplemented("writing CSV datasets")
+    def make_write_options(self, WriteOptions options=None,

Review comment:
       This is kind of confusing having a method named `make_write_options` that takes in an instance of `WriteOptions`.  Perhaps in C++ it wouldn't be so bad but for Python I think we might want something more understandable.

##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       Although I suppose it is hidden behind `MakeCSVWriter`.  Which I suppose begs the question "Why have the non-owned option at all?"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663310067



##########
File path: cpp/src/arrow/csv/writer.h
##########
@@ -37,11 +40,34 @@ namespace csv {
 /// \brief Converts table to a CSV and writes the results to output.
 /// Experimental
 ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options,
-                             MemoryPool* pool, arrow::io::OutputStream* output);
+                             arrow::io::OutputStream* output);
 /// \brief Converts batch to CSV and writes the results to output.
 /// Experimental
 ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                             MemoryPool* pool, arrow::io::OutputStream* output);
+                             arrow::io::OutputStream* output);
+
+/// \brief Create a new CSV writer. User is responsible for closing the
+/// actual OutputStream.
+///
+/// \param[in] sink output stream to write to
+/// \param[in] schema the schema of the record batches to be written
+/// \param[in] options options for serialization
+/// \return Result<std::shared_ptr<RecordBatchWriter>>
+ARROW_EXPORT
+Result<std::shared_ptr<ipc::RecordBatchWriter>> MakeCSVWriter(
+    std::shared_ptr<io::OutputStream> sink, const std::shared_ptr<Schema>& schema,
+    const WriteOptions& options = WriteOptions::Defaults());
+
+/// \brief Create a new CSV writer.
+///
+/// \param[in] sink output stream to write to

Review comment:
       also note that ownership is not taken here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r662453585



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
  private:
-  CSVConverter(std::shared_ptr<Schema> schema,
-               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
-      : column_populators_(std::move(populators)),
+  CSVConverter(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+               std::shared_ptr<Schema> schema,
+               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool,
+               const WriteOptions& options)
+      : sink_(sink),
+        owned_sink_(std::move(owned_sink)),
+        column_populators_(std::move(populators)),
         offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
         schema_(std::move(schema)),
-        pool_(pool) {}
+        pool_(pool),
+        options_(options) {}
 
-  Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+  Status PrepareForContentsWrite() {

Review comment:
       Sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-843273563


   Just to follow up - APIs like FileSystem return `shared_ptr<OutputStream>` so it would be very annoying to take `unique_ptr`. And we could just take only `OutputStream*` but IMO even if the caller is supposed to keep the pointer alive, it's safer to offer the option to take a `shared_ptr` by default to minimize mistakes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorisvandenbossche commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-832153963


   Does this also enable writing CSV with the dataset API in Python? (`write_dataset(..., format="csv")`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-861680979


   Rebased/fixed conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663309828



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,76 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVWriterImpl : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVWriterImpl>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, const WriteOptions& options) {
+    RETURN_NOT_OK(options.Validate());
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
-      ASSIGN_OR_RAISE(populators[col],
-                      MakePopulator(*schema->field(col), end_char, pool));
+      ASSIGN_OR_RAISE(populators[col], MakePopulator(*schema->field(col), end_char,
+                                                     options.io_context.pool()));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVWriterImpl>(new CSVWriterImpl(
+        sink, std::move(owned_sink), std::move(schema), std::move(populators), options));
+    RETURN_NOT_OK(writer->PrepareForContentsWrite());
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
  private:
-  CSVConverter(std::shared_ptr<Schema> schema,
-               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
-      : column_populators_(std::move(populators)),
-        offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
+  CSVWriterImpl(io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,

Review comment:
       i guess this would need to be public to used std::make_shared.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#issuecomment-831936440


   @emkornfield would you be free to take a look (at least the CSV side, if not the Datasets side)? No rush of course.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663310270



##########
File path: python/pyarrow/_csv.pyx
##########
@@ -1010,15 +1015,44 @@ def write_csv(data, output_file, write_options=None,
 
     get_writer(output_file, &stream)
     c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+    c_write_options.io_context = CIOContext(c_memory_pool)
     if isinstance(data, RecordBatch):
         batch = pyarrow_unwrap_batch(data).get()
         with nogil:
-            check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
     elif isinstance(data, Table):
         table = pyarrow_unwrap_table(data).get()
         with nogil:
-            check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
-                                  stream.get()))
+            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
     else:
         raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
+
+
+cdef class CSVWriter(_CRecordBatchWriter):
+    """Writer to create a CSV file.
+
+    Parameters
+    ----------
+    sink: string, path, pyarrow.OutputStream or file-like object
+        The location where to write the CSV data.
+    schema: pyarrow.Schema
+        The schema of the data to be written.
+    write_options: pyarrow.csv.WriteOptions
+        Options to configure writing the CSV data.
+    memory_pool: MemoryPool, optional
+        Pool for temporary allocations.
+    """
+
+    def __init__(self, sink, Schema schema, *,
+                 WriteOptions write_options=None, MemoryPool memory_pool=None):
+        cdef:
+            shared_ptr[COutputStream] c_stream
+            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
+            CCSVWriteOptions c_write_options
+            CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+        _get_write_options(write_options, &c_write_options)
+        c_write_options.io_context = CIOContext(c_memory_pool)

Review comment:
       IOContext is new to me in general.  Should we be making new API's take that instead of Memory pool?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r663040601



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,36 +416,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;
   std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
   std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
   std::shared_ptr<ResizableBuffer> data_buffer_;
   const std::shared_ptr<Schema> schema_;
-  MemoryPool* pool_;
+  WriteOptions options_;

Review comment:
       I added the const.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631162520



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +415,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;

Review comment:
       shared_ptr seems strange in general for a OutputStream which seems for the most part should have only one owner.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631170861



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -282,65 +283,79 @@ Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char
   return std::unique_ptr<ColumnPopulator>(factory.populator);
 }
 
-class CSVConverter {
+class CSVConverter : public ipc::RecordBatchWriter {
  public:
-  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
-                                                    MemoryPool* pool) {
+  static Result<std::shared_ptr<CSVConverter>> Make(
+      io::OutputStream* sink, std::shared_ptr<io::OutputStream> owned_sink,
+      std::shared_ptr<Schema> schema, MemoryPool* pool, const WriteOptions& options) {
+    if (!pool) pool = default_memory_pool();
     std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
     for (int col = 0; col < schema->num_fields(); col++) {
       char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
       ASSIGN_OR_RAISE(populators[col],
                       MakePopulator(*schema->field(col), end_char, pool));
     }
-    return std::unique_ptr<CSVConverter>(
-        new CSVConverter(std::move(schema), std::move(populators), pool));
+    auto writer = std::shared_ptr<CSVConverter>(
+        new CSVConverter(sink, std::move(owned_sink), std::move(schema),
+                         std::move(populators), pool, options));
+    if (options.include_header) {
+      RETURN_NOT_OK(writer->PrepareForContentsWrite());
+      RETURN_NOT_OK(writer->WriteHeader());
+    }
+    return writer;
   }
 
-  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
-                  io::OutputStream* out) {
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
-    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(PrepareForContentsWrite());
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options_.batch_size);
     for (auto maybe_slice : iterator) {
       ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      stats_.num_record_batches++;
     }
     return Status::OK();
   }
 
-  Status WriteCSV(const Table& table, const WriteOptions& options,
-                  io::OutputStream* out) {
+  Status WriteTable(const Table& table, int64_t max_chunksize) override {
     TableBatchReader reader(table);
-    reader.set_chunksize(options.batch_size);
-    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    reader.set_chunksize(max_chunksize > 0 ? max_chunksize : options_.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite());
     std::shared_ptr<RecordBatch> batch;
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(sink_->Write(data_buffer_));
       RETURN_NOT_OK(reader.ReadNext(&batch));
+      stats_.num_record_batches++;
     }
 
     return Status::OK();
   }
 
+  Status Close() override { return Status::OK(); }

Review comment:
       other places I can think of (Buffer comes to mind), the way this is handled is passing a unique_ptr instead of a shared_ptr.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #10230:
URL: https://github.com/apache/arrow/pull/10230#discussion_r631163305



##########
File path: cpp/src/arrow/csv/writer.cc
##########
@@ -403,34 +415,44 @@ class CSVConverter {
   }
 
   static constexpr int64_t kColumnSizeGuess = 8;
+  io::OutputStream* sink_;
+  std::shared_ptr<io::OutputStream> owned_sink_;
   std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
   std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
   std::shared_ptr<ResizableBuffer> data_buffer_;
   const std::shared_ptr<Schema> schema_;
-  MemoryPool* pool_;
+  WriteOptions options_;
+  ipc::WriteStats stats_;
 };
 
 }  // namespace
 
-Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool,
+Status WriteCSV(const Table& table, const WriteOptions& options,
                 arrow::io::OutputStream* output) {
-  if (pool == nullptr) {
-    pool = default_memory_pool();
-  }
-  ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
-                  CSVConverter::Make(table.schema(), pool));
-  return converter->WriteCSV(table, options, output);
+  ASSIGN_OR_RAISE(auto converter, MakeCSVWriter(output, table.schema(), options));

Review comment:
       nit: should converter now be writer?  (same question below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10230: ARROW-12512: [C++][Python][Dataset] Create CSV writer class and add Datasets support

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10230:
URL: https://github.com/apache/arrow/pull/10230


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org