You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/02/25 19:16:44 UTC

[arrow] branch master updated: ARROW-2229: [C++][Python] Add WriteCsv functionality.

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9baf6  ARROW-2229: [C++][Python] Add WriteCsv functionality.
9a9baf6 is described below

commit 9a9baf6824db91be2c0913367d4b151d9390a4e6
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Thu Feb 25 20:15:54 2021 +0100

    ARROW-2229: [C++][Python] Add WriteCsv functionality.
    
    This offers possibly performance naive CSV writer with
    limited options to keep the initial PR down.
    
    Obvious potential improvements to this approach
    are:
    
    - Smarter casts for dictionaries
    - Arena allocation for intermediate cast results
    
    The implementation also means that for all primitive type
    support we might have to fill in gaps in our cast function.
    
    Closes #9504 from emkornfield/csv
    
    Lead-authored-by: Micah Kornfield <em...@gmail.com>
    Co-authored-by: emkornfield <mi...@google.com>
    Co-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: Micah Kornfield <mi...@google.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/CMakeLists.txt         |   3 +
 cpp/src/arrow/csv/CMakeLists.txt     |  22 +-
 cpp/src/arrow/csv/api.h              |   5 +
 cpp/src/arrow/csv/options.cc         |   1 +
 cpp/src/arrow/csv/options.h          |  15 ++
 cpp/src/arrow/csv/writer.cc          | 437 +++++++++++++++++++++++++++++++++++
 cpp/src/arrow/csv/writer.h           |  47 ++++
 cpp/src/arrow/csv/writer_test.cc     | 129 +++++++++++
 cpp/src/arrow/ipc/json_simple.cc     |   6 +-
 cpp/src/arrow/util/config.h.cmake    |   2 +
 cpp/src/arrow/util/iterator.h        |   3 +-
 python/pyarrow/_csv.pyx              | 103 ++++++++-
 python/pyarrow/csv.py                |   3 +-
 python/pyarrow/includes/libarrow.pxd |  12 +
 python/pyarrow/tests/test_csv.py     |  37 ++-
 15 files changed, 811 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 382a851..abd5428 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -349,6 +349,9 @@ if(ARROW_CSV)
               csv/options.cc
               csv/parser.cc
               csv/reader.cc)
+  if(ARROW_COMPUTE)
+    list(APPEND ARROW_SRCS csv/writer.cc)
+  endif()
 
   list(APPEND ARROW_TESTING_SRCS csv/test_common.cc)
 endif()
diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt
index 2766cfd..561faf1 100644
--- a/cpp/src/arrow/csv/CMakeLists.txt
+++ b/cpp/src/arrow/csv/CMakeLists.txt
@@ -15,14 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_arrow_test(csv-test
-               SOURCES
-               chunker_test.cc
-               column_builder_test.cc
-               column_decoder_test.cc
-               converter_test.cc
-               parser_test.cc
-               reader_test.cc)
+set(CSV_TEST_SRCS
+    chunker_test.cc
+    column_builder_test.cc
+    column_decoder_test.cc
+    converter_test.cc
+    parser_test.cc
+    reader_test.cc)
+
+# Writer depends on compute's cast functionality
+if(ARROW_COMPUTE)
+  list(APPEND CSV_TEST_SRCS writer_test.cc)
+endif()
+
+add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS})
 
 add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv")
 add_arrow_benchmark(parser_benchmark PREFIX "arrow-csv")
diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h
index df88843..7bf3931 100644
--- a/cpp/src/arrow/csv/api.h
+++ b/cpp/src/arrow/csv/api.h
@@ -19,3 +19,8 @@
 
 #include "arrow/csv/options.h"
 #include "arrow/csv/reader.h"
+
+// The writer depends on compute module for casting.
+#ifdef ARROW_COMPUTE
+#include "arrow/csv/writer.h"
+#endif
diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc
index b6f1346..a515abf 100644
--- a/cpp/src/arrow/csv/options.cc
+++ b/cpp/src/arrow/csv/options.cc
@@ -34,6 +34,7 @@ ConvertOptions ConvertOptions::Defaults() {
 }
 
 ReadOptions ReadOptions::Defaults() { return ReadOptions(); }
+WriteOptions WriteOptions::Defaults() { return WriteOptions(); }
 
 }  // namespace csv
 }  // namespace arrow
diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h
index 82153ed..5c912e7 100644
--- a/cpp/src/arrow/csv/options.h
+++ b/cpp/src/arrow/csv/options.h
@@ -137,5 +137,20 @@ struct ARROW_EXPORT ReadOptions {
   static ReadOptions Defaults();
 };
 
+/// Experimental
+struct ARROW_EXPORT WriteOptions {
+  /// Whether to write an initial header line with column names
+  bool include_header = true;
+
+  /// \brief Maximum number of rows processed at a time
+  ///
+  /// The CSV writer converts and writes data in batches of N rows.
+  /// This number can impact performance.
+  int32_t batch_size = 1024;
+
+  /// Create write options with default values
+  static WriteOptions Defaults();
+};
+
 }  // namespace csv
 }  // namespace arrow
diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc
new file mode 100644
index 0000000..ddd59b4
--- /dev/null
+++ b/cpp/src/arrow/csv/writer.cc
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/csv/writer.h"
+#include "arrow/array.h"
+#include "arrow/compute/cast.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/result_internal.h"
+#include "arrow/stl_allocator.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace csv {
+// This implementation is intentionally light on configurability to minimize the size of
+// the initial PR. Aditional features can be added as there is demand and interest to
+// implement them.
+//
+// The algorithm used here at a high level is to break RecordBatches/Tables into slices
+// and convert each slice independently.  A slice is then converted to CSV by first
+// scanning each column to determine the size of its contents when rendered as a string in
+// CSV. For non-string types this requires casting the value to string (which is cached).
+// This data is used to understand the precise length of each row and a single allocation
+// for the final CSV data buffer. Once the final size is known each column is then
+// iterated over again to place its contents into the CSV data buffer. The rationale for
+// choosing this approach is it allows for reuse of the cast functionality in the compute
+// module and inline data visiting functionality in the core library. A performance
+// comparison has not been done using a naive single-pass approach. This approach might
+// still be competitive due to reduction in the number of per row branches necessary with
+// a single pass approach. Profiling would likely yield further opportunities for
+// optimization with this approach.
+
+namespace {
+
+struct SliceIteratorFunctor {
+  Result<std::shared_ptr<RecordBatch>> Next() {
+    if (current_offset < batch->num_rows()) {
+      std::shared_ptr<RecordBatch> next = batch->Slice(current_offset, slice_size);
+      current_offset += slice_size;
+      return next;
+    }
+    return IterationTraits<std::shared_ptr<RecordBatch>>::End();
+  }
+  const RecordBatch* const batch;
+  const int64_t slice_size;
+  int64_t current_offset;
+};
+
+RecordBatchIterator RecordBatchSliceIterator(const RecordBatch& batch,
+                                             int64_t slice_size) {
+  SliceIteratorFunctor functor = {&batch, slice_size, /*offset=*/static_cast<int64_t>(0)};
+  return RecordBatchIterator(std::move(functor));
+}
+
+// Counts the number of characters that need escaping in s.
+int64_t CountEscapes(util::string_view s) {
+  return static_cast<int64_t>(std::count(s.begin(), s.end(), '"'));
+}
+
+// Matching quote pair character length.
+constexpr int64_t kQuoteCount = 2;
+constexpr int64_t kQuoteDelimiterCount = kQuoteCount + /*end_char*/ 1;
+
+// Interface for generating CSV data per column.
+// The intended usage is to iteratively call UpdateRowLengths for a column and
+// then PopulateColumns. PopulateColumns must be called in the reverse order of the
+// populators (it populates data backwards).
+class ColumnPopulator {
+ public:
+  ColumnPopulator(MemoryPool* pool, char end_char) : end_char_(end_char), pool_(pool) {}
+
+  virtual ~ColumnPopulator() = default;
+
+  // Adds the number of characters each entry in data will add to to elements
+  // in row_lengths.
+  Status UpdateRowLengths(const Array& data, int32_t* row_lengths) {
+    compute::ExecContext ctx(pool_);
+    // Populators are intented to be applied to reasonably small data.  In most cases
+    // threading overhead would not be justified.
+    ctx.set_use_threads(false);
+    ASSIGN_OR_RAISE(
+        std::shared_ptr<Array> casted,
+        compute::Cast(data, /*to_type=*/utf8(), compute::CastOptions(), &ctx));
+    casted_array_ = internal::checked_pointer_cast<StringArray>(casted);
+    return UpdateRowLengths(row_lengths);
+  }
+
+  // Places string data onto each row in output and updates the corresponding row
+  // row pointers in preparation for calls to other (preceding) ColumnPopulators.
+  // Args:
+  //   output: character buffer to write to.
+  //   offsets: an array of end of row column within the the output buffer (values are
+  //   one past the end of the position to write to).
+  virtual void PopulateColumns(char* output, int32_t* offsets) const = 0;
+
+ protected:
+  virtual Status UpdateRowLengths(int32_t* row_lengths) = 0;
+  std::shared_ptr<StringArray> casted_array_;
+  const char end_char_;
+
+ private:
+  MemoryPool* const pool_;
+};
+
+// Copies the contents of to out properly escaping any necessary characters.
+// Returns the position prior to last copied character (out_end is decremented).
+char* EscapeReverse(arrow::util::string_view s, char* out_end) {
+  for (const char* val = s.data() + s.length() - 1; val >= s.data(); val--, out_end--) {
+    if (*val == '"') {
+      *out_end = *val;
+      out_end--;
+    }
+    *out_end = *val;
+  }
+  return out_end;
+}
+
+// Populator for non-string types.  This populator relies on compute Cast functionality to
+// String if it doesn't exist it will be an error.  it also assumes the resulting string
+// from a cast does not require quoting or escaping.
+class UnquotedColumnPopulator : public ColumnPopulator {
+ public:
+  explicit UnquotedColumnPopulator(MemoryPool* memory_pool, char end_char)
+      : ColumnPopulator(memory_pool, end_char) {}
+
+  Status UpdateRowLengths(int32_t* row_lengths) override {
+    for (int x = 0; x < casted_array_->length(); x++) {
+      row_lengths[x] += casted_array_->value_length(x);
+    }
+    return Status::OK();
+  }
+
+  void PopulateColumns(char* output, int32_t* offsets) const override {
+    VisitArrayDataInline<StringType>(
+        *casted_array_->data(),
+        [&](arrow::util::string_view s) {
+          int64_t next_column_offset = s.length() + /*end_char*/ 1;
+          memcpy((output + *offsets - next_column_offset), s.data(), s.length());
+          *(output + *offsets - 1) = end_char_;
+          *offsets -= static_cast<int32_t>(next_column_offset);
+          offsets++;
+        },
+        [&]() {
+          // Nulls are empty (unquoted) to distinguish with empty string.
+          *(output + *offsets - 1) = end_char_;
+          *offsets -= 1;
+          offsets++;
+        });
+  }
+};
+
+// Strings need special handling to ensure they are escaped properly.
+// This class handles escaping assuming that all strings will be quoted
+// and that the only character within the string that needs to escaped is
+// a quote character (") and escaping is done my adding another quote.
+class QuotedColumnPopulator : public ColumnPopulator {
+ public:
+  QuotedColumnPopulator(MemoryPool* pool, char end_char)
+      : ColumnPopulator(pool, end_char) {}
+
+  Status UpdateRowLengths(int32_t* row_lengths) override {
+    const StringArray& input = *casted_array_;
+    int row_number = 0;
+    row_needs_escaping_.resize(casted_array_->length());
+    VisitArrayDataInline<StringType>(
+        *input.data(),
+        [&](arrow::util::string_view s) {
+          int64_t escaped_count = CountEscapes(s);
+          // TODO: Maybe use 64 bit row lengths or safe cast?
+          row_needs_escaping_[row_number] = escaped_count > 0;
+          row_lengths[row_number] += static_cast<int32_t>(s.length()) +
+                                     static_cast<int32_t>(escaped_count + kQuoteCount);
+          row_number++;
+        },
+        [&]() {
+          row_needs_escaping_[row_number] = false;
+          row_number++;
+        });
+    return Status::OK();
+  }
+
+  void PopulateColumns(char* output, int32_t* offsets) const override {
+    auto needs_escaping = row_needs_escaping_.begin();
+    VisitArrayDataInline<StringType>(
+        *(casted_array_->data()),
+        [&](arrow::util::string_view s) {
+          // still needs string content length to be added
+          char* row_end = output + *offsets;
+          int32_t next_column_offset = 0;
+          if (!*needs_escaping) {
+            next_column_offset = static_cast<int32_t>(s.length() + kQuoteDelimiterCount);
+            memcpy(row_end - next_column_offset + /*quote_offset=*/1, s.data(),
+                   s.length());
+          } else {
+            // Adjust row_end by 3: 1 quote char, 1 end char and 1 to position at the
+            // first position to write to.
+            next_column_offset =
+                static_cast<int32_t>(row_end - EscapeReverse(s, row_end - 3));
+          }
+          *(row_end - next_column_offset) = '"';
+          *(row_end - 2) = '"';
+          *(row_end - 1) = end_char_;
+          *offsets -= next_column_offset;
+          offsets++;
+          needs_escaping++;
+        },
+        [&]() {
+          // Nulls are empty (unquoted) to distinguish with empty string.
+          *(output + *offsets - 1) = end_char_;
+          *offsets -= 1;
+          offsets++;
+          needs_escaping++;
+        });
+  }
+
+ private:
+  // Older version of GCC don't support custom allocators
+  // at some point we should change this to use memory_pool
+  // backed allocator.
+  std::vector<bool> row_needs_escaping_;
+};
+
+struct PopulatorFactory {
+  template <typename TypeClass>
+  enable_if_t<is_base_binary_type<TypeClass>::value ||
+                  std::is_same<FixedSizeBinaryType, TypeClass>::value,
+              Status>
+  Visit(const TypeClass& type) {
+    populator = new QuotedColumnPopulator(pool, end_char);
+    return Status::OK();
+  }
+
+  template <typename TypeClass>
+  enable_if_dictionary<TypeClass, Status> Visit(const TypeClass& type) {
+    return VisitTypeInline(*type.value_type(), this);
+  }
+
+  template <typename TypeClass>
+  enable_if_t<is_nested_type<TypeClass>::value || is_extension_type<TypeClass>::value,
+              Status>
+  Visit(const TypeClass& type) {
+    return Status::Invalid("Unsupported Type:", type.ToString());
+  }
+
+  template <typename TypeClass>
+  enable_if_t<is_primitive_ctype<TypeClass>::value || is_decimal_type<TypeClass>::value ||
+                  is_null_type<TypeClass>::value || is_temporal_type<TypeClass>::value,
+              Status>
+  Visit(const TypeClass& type) {
+    populator = new UnquotedColumnPopulator(pool, end_char);
+    return Status::OK();
+  }
+
+  char end_char;
+  MemoryPool* pool;
+  ColumnPopulator* populator;
+};
+
+Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char end_char,
+                                                       MemoryPool* pool) {
+  PopulatorFactory factory{end_char, pool, nullptr};
+  RETURN_NOT_OK(VisitTypeInline(*field.type(), &factory));
+  return std::unique_ptr<ColumnPopulator>(factory.populator);
+}
+
+class CSVConverter {
+ public:
+  static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
+                                                    MemoryPool* pool) {
+    std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
+    for (int col = 0; col < schema->num_fields(); col++) {
+      char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
+      ASSIGN_OR_RAISE(populators[col],
+                      MakePopulator(*schema->field(col), end_char, pool));
+    }
+    return std::unique_ptr<CSVConverter>(
+        new CSVConverter(std::move(schema), std::move(populators), pool));
+  }
+
+  Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
+                  io::OutputStream* out) {
+    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+    for (auto maybe_slice : iterator) {
+      ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
+      RETURN_NOT_OK(TranslateMinimalBatch(*slice));
+      RETURN_NOT_OK(out->Write(data_buffer_));
+    }
+    return Status::OK();
+  }
+
+  Status WriteCSV(const Table& table, const WriteOptions& options,
+                  io::OutputStream* out) {
+    TableBatchReader reader(table);
+    reader.set_chunksize(options.batch_size);
+    RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader.ReadNext(&batch));
+    while (batch != nullptr) {
+      RETURN_NOT_OK(TranslateMinimalBatch(*batch));
+      RETURN_NOT_OK(out->Write(data_buffer_));
+      RETURN_NOT_OK(reader.ReadNext(&batch));
+    }
+
+    return Status::OK();
+  }
+
+ private:
+  CSVConverter(std::shared_ptr<Schema> schema,
+               std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
+      : column_populators_(std::move(populators)),
+        offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
+        schema_(std::move(schema)),
+        pool_(pool) {}
+
+  Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+    if (data_buffer_ == nullptr) {
+      ASSIGN_OR_RAISE(
+          data_buffer_,
+          AllocateResizableBuffer(
+              options.batch_size * schema_->num_fields() * kColumnSizeGuess, pool_));
+    }
+    if (options.include_header) {
+      RETURN_NOT_OK(WriteHeader(out));
+    }
+    return Status::OK();
+  }
+
+  int64_t CalculateHeaderSize() const {
+    int64_t header_length = 0;
+    for (int col = 0; col < schema_->num_fields(); col++) {
+      const std::string& col_name = schema_->field(col)->name();
+      header_length += col_name.size();
+      header_length += CountEscapes(col_name);
+    }
+    return header_length + (kQuoteDelimiterCount * schema_->num_fields());
+  }
+
+  Status WriteHeader(io::OutputStream* out) {
+    RETURN_NOT_OK(data_buffer_->Resize(CalculateHeaderSize(), /*shrink_to_fit=*/false));
+    char* next =
+        reinterpret_cast<char*>(data_buffer_->mutable_data() + data_buffer_->size() - 1);
+    for (int col = schema_->num_fields() - 1; col >= 0; col--) {
+      *next-- = ',';
+      *next-- = '"';
+      next = EscapeReverse(schema_->field(col)->name(), next);
+      *next-- = '"';
+    }
+    *(data_buffer_->mutable_data() + data_buffer_->size() - 1) = '\n';
+    DCHECK_EQ(reinterpret_cast<uint8_t*>(next + 1), data_buffer_->data());
+    return out->Write(data_buffer_);
+  }
+
+  Status TranslateMinimalBatch(const RecordBatch& batch) {
+    if (batch.num_rows() == 0) {
+      return Status::OK();
+    }
+    offsets_.resize(batch.num_rows());
+    std::fill(offsets_.begin(), offsets_.end(), 0);
+
+    // Calculate relative offsets for each row (excluding delimiters)
+    for (int32_t col = 0; col < static_cast<int32_t>(column_populators_.size()); col++) {
+      RETURN_NOT_OK(
+          column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data()));
+    }
+    // Calculate cumulalative offsets for each row (including delimiters).
+    offsets_[0] += batch.num_columns();
+    for (int64_t row = 1; row < batch.num_rows(); row++) {
+      offsets_[row] += offsets_[row - 1] + /*delimiter lengths*/ batch.num_columns();
+    }
+    // Resize the target buffer to required size. We assume batch to batch sizes
+    // should be pretty close so don't shrink the buffer to avoid allocation churn.
+    RETURN_NOT_OK(data_buffer_->Resize(offsets_.back(), /*shrink_to_fit=*/false));
+
+    // Use the offsets to populate contents.
+    for (auto populator = column_populators_.rbegin();
+         populator != column_populators_.rend(); populator++) {
+      (*populator)
+          ->PopulateColumns(reinterpret_cast<char*>(data_buffer_->mutable_data()),
+                            offsets_.data());
+    }
+    DCHECK_EQ(0, offsets_[0]);
+    return Status::OK();
+  }
+
+  static constexpr int64_t kColumnSizeGuess = 8;
+  std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
+  std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
+  std::shared_ptr<ResizableBuffer> data_buffer_;
+  const std::shared_ptr<Schema> schema_;
+  MemoryPool* pool_;
+};
+
+}  // namespace
+
+Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool,
+                arrow::io::OutputStream* output) {
+  if (pool == nullptr) {
+    pool = default_memory_pool();
+  }
+  ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
+                  CSVConverter::Make(table.schema(), pool));
+  return converter->WriteCSV(table, options, output);
+}
+
+Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool,
+                arrow::io::OutputStream* output) {
+  if (pool == nullptr) {
+    pool = default_memory_pool();
+  }
+
+  ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
+                  CSVConverter::Make(batch.schema(), pool));
+  return converter->WriteCSV(batch, options, output);
+}
+
+}  // namespace csv
+}  // namespace arrow
diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h
new file mode 100644
index 0000000..c009d78
--- /dev/null
+++ b/cpp/src/arrow/csv/writer.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/csv/options.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+
+namespace arrow {
+namespace csv {
+// Functionality for converting Arrow data to Comma separated value text.
+// This library supports all primitive types that can be cast to a StringArrays.
+// It applies to following formatting rules:
+//  - For non-binary types no quotes surround values.  Nulls are represented as the empty
+//  string.
+//  - For binary types all non-null data is quoted (and quotes within data are escaped
+//  with an additional quote).
+//    Null values are empty and unquoted.
+//  - LF (\n) is always used as a line ending.
+
+/// \brief Converts table to a CSV and writes the results to output.
+/// Experimental
+ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options,
+                             MemoryPool* pool, arrow::io::OutputStream* output);
+/// \brief Converts batch to CSV and writes the results to output.
+/// Experimental
+ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
+                             MemoryPool* pool, arrow::io::OutputStream* output);
+
+}  // namespace csv
+}  // namespace arrow
diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc
new file mode 100644
index 0000000..dc59fef
--- /dev/null
+++ b/cpp/src/arrow/csv/writer_test.cc
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <memory>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/writer.h"
+#include "arrow/io/memory.h"
+#include "arrow/record_batch.h"
+#include "arrow/result_internal.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace csv {
+
+struct TestParams {
+  std::shared_ptr<RecordBatch> record_batch;
+  WriteOptions options;
+  std::string expected_output;
+};
+
+WriteOptions DefaultTestOptions(bool include_header) {
+  WriteOptions options;
+  options.batch_size = 5;
+  options.include_header = include_header;
+  return options;
+}
+
+std::vector<TestParams> GenerateTestCases() {
+  auto abc_schema = schema({
+      {field("a", uint64())},
+      {field("b\"", utf8())},
+      {field("c ", int32())},
+  });
+  auto empty_batch =
+      RecordBatch::Make(abc_schema, /*num_rows=*/0,
+                        {
+                            ArrayFromJSON(abc_schema->field(0)->type(), "[]"),
+                            ArrayFromJSON(abc_schema->field(1)->type(), "[]"),
+                            ArrayFromJSON(abc_schema->field(2)->type(), "[]"),
+                        });
+  auto populated_batch = RecordBatchFromJSON(abc_schema, R"([{"a": 1, "c ": -1},
+                                                         { "a": 1, "b\"": "abc\"efg", "c ": 2324},
+                                                         { "b\"": "abcd", "c ": 5467},
+                                                         { },
+                                                         { "a": 546, "b\"": "", "c ": 517 },
+                                                         { "a": 124, "b\"": "a\"\"b\"" }])");
+  std::string expected_without_header = std::string("1,,-1") + "\n" +     // line 1
+                                        +R"(1,"abc""efg",2324)" + "\n" +  // line 2
+                                        R"(,"abcd",5467)" + "\n" +        // line 3
+                                        R"(,,)" + "\n" +                  // line 4
+                                        R"(546,"",517)" + "\n" +          // line 5
+                                        R"(124,"a""""b""",)" + "\n";      // line 6
+  std::string expected_header = std::string(R"("a","b""","c ")") + "\n";
+
+  return std::vector<TestParams>{
+      {empty_batch, DefaultTestOptions(/*header=*/false), ""},
+      {empty_batch, DefaultTestOptions(/*header=*/true), expected_header},
+      {populated_batch, DefaultTestOptions(/*header=*/false), expected_without_header},
+      {populated_batch, DefaultTestOptions(/*header=*/true),
+       expected_header + expected_without_header}};
+}
+
+class TestWriteCSV : public ::testing::TestWithParam<TestParams> {
+ protected:
+  template <typename Data>
+  Result<std::string> ToCsvString(const Data& data, const WriteOptions& options) {
+    std::shared_ptr<io::BufferOutputStream> out;
+    ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create());
+
+    RETURN_NOT_OK(WriteCSV(data, options, default_memory_pool(), out.get()));
+    ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer, out->Finish());
+    return std::string(reinterpret_cast<const char*>(buffer->data()), buffer->size());
+  }
+};
+
+TEST_P(TestWriteCSV, TestWrite) {
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::BufferOutputStream> out,
+                       io::BufferOutputStream::Create());
+  WriteOptions options = GetParam().options;
+  std::string csv;
+  ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options));
+  EXPECT_EQ(csv, GetParam().expected_output);
+
+  // Batch size shouldn't matter.
+  options.batch_size /= 2;
+  ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options));
+  EXPECT_EQ(csv, GetParam().expected_output);
+
+  // Table and Record batch should work identically.
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
+                       Table::FromRecordBatches({GetParam().record_batch}));
+  ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*table, options));
+  EXPECT_EQ(csv, GetParam().expected_output);
+}
+
+INSTANTIATE_TEST_SUITE_P(MultiColumnWriteCSVTest, TestWriteCSV,
+                         ::testing::ValuesIn(GenerateTestCases()));
+
+INSTANTIATE_TEST_SUITE_P(
+    SingleColumnWriteCSVTest, TestWriteCSV,
+    ::testing::Values(TestParams{
+        RecordBatchFromJSON(schema({field("int64", int64())}),
+                            R"([{ "int64": 9999}, {}, { "int64": -15}])"),
+        WriteOptions(),
+        R"("int64")"
+        "\n9999\n\n-15\n"}));
+
+}  // namespace csv
+}  // namespace arrow
diff --git a/cpp/src/arrow/ipc/json_simple.cc b/cpp/src/arrow/ipc/json_simple.cc
index fba8194..caf6fd0 100644
--- a/cpp/src/arrow/ipc/json_simple.cc
+++ b/cpp/src/arrow/ipc/json_simple.cc
@@ -43,6 +43,7 @@
 #include <rapidjson/error/en.h>
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/reader.h>
+#include <rapidjson/writer.h>
 
 namespace rj = arrow::rapidjson;
 
@@ -652,8 +653,11 @@ class StructConverter final : public ConcreteConverter<StructConverter> {
         }
       }
       if (remaining > 0) {
+        rj::StringBuffer sb;
+        rj::Writer<rj::StringBuffer> writer(sb);
+        json_obj.Accept(writer);
         return Status::Invalid("Unexpected members in JSON object for type ",
-                               type_->ToString());
+                               type_->ToString(), " Object: ", sb.GetString());
       }
       return builder_->Append();
     }
diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake
index 8f8dea0..be6686f 100644
--- a/cpp/src/arrow/util/config.h.cmake
+++ b/cpp/src/arrow/util/config.h.cmake
@@ -34,6 +34,8 @@
 
 #define ARROW_PACKAGE_KIND "@ARROW_PACKAGE_KIND@"
 
+#cmakedefine ARROW_COMPUTE
+
 #cmakedefine ARROW_S3
 #cmakedefine ARROW_USE_NATIVE_INT128
 
diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h
index 75ccf28..771b209 100644
--- a/cpp/src/arrow/util/iterator.h
+++ b/cpp/src/arrow/util/iterator.h
@@ -64,7 +64,8 @@ template <typename T>
 class Iterator : public util::EqualityComparable<Iterator<T>> {
  public:
   /// \brief Iterator may be constructed from any type which has a member function
-  /// with signature Status Next(T*);
+  /// with signature Result<T> Next();
+  /// End of iterator is signalled by returning IteratorTraits<T>::End();
   ///
   /// The argument is moved or copied to the heap and kept in a unique_ptr<void>. Only
   /// its destructor and its Next method (which are stored in function pointers) are
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 4068a0b..f5b8e4d 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -30,10 +30,13 @@ from pyarrow.includes.libarrow cimport *
 from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
                           RecordBatchReader, ensure_type,
                           maybe_unbox_memory_pool, get_input_stream,
-                          native_transcoding_input_stream,
+                          get_writer, native_transcoding_input_stream,
+                          pyarrow_unwrap_batch, pyarrow_unwrap_table,
                           pyarrow_wrap_schema, pyarrow_wrap_table,
-                          pyarrow_wrap_data_type, pyarrow_unwrap_data_type)
+                          pyarrow_wrap_data_type, pyarrow_unwrap_data_type,
+                          Table, RecordBatch)
 from pyarrow.lib import frombytes, tobytes
+from pyarrow.util import _stringify_path
 
 
 cdef unsigned char _single_char(s) except 0:
@@ -763,3 +766,99 @@ def open_csv(input_file, read_options=None, parse_options=None,
                  move(c_convert_options),
                  maybe_unbox_memory_pool(memory_pool))
     return reader
+
+
+cdef class WriteOptions(_Weakrefable):
+    """
+    Options for writing CSV files.
+
+    Parameters
+    ----------
+    include_header : bool, optional (default True)
+        Whether to write an initial header line with column names
+    batch_size : int, optional (default 1024)
+        How many rows to process together when converting and writing
+        CSV data
+    """
+    cdef:
+        CCSVWriteOptions options
+
+    # Avoid mistakingly creating attributes
+    __slots__ = ()
+
+    def __init__(self, *, include_header=None, batch_size=None):
+        self.options = CCSVWriteOptions.Defaults()
+        if include_header is not None:
+            self.include_header = include_header
+        if batch_size is not None:
+            self.batch_size = batch_size
+
+    @property
+    def include_header(self):
+        """
+        Whether to write an initial header line with column names.
+        """
+        return self.options.include_header
+
+    @include_header.setter
+    def include_header(self, value):
+        self.options.include_header = value
+
+    @property
+    def batch_size(self):
+        """
+        How many rows to process together when converting and writing
+        CSV data.
+        """
+        return self.options.batch_size
+
+    @batch_size.setter
+    def batch_size(self, value):
+        self.options.batch_size = value
+
+
+cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
+    if write_options is None:
+        out[0] = CCSVWriteOptions.Defaults()
+    else:
+        out[0] = write_options.options
+
+
+def write_csv(data, output_file, write_options=None,
+              MemoryPool memory_pool=None):
+    """
+    Write record batch or table to a CSV file.
+
+    Parameters
+    ----------
+    data: pyarrow.RecordBatch or pyarrow.Table
+        The data to write.
+    output_file: string, path, pyarrow.OutputStream or file-like object
+        The location where to write the CSV data.
+    write_options: pyarrow.csv.WriteOptions
+        Options to configure writing the CSV data.
+    memory_pool: MemoryPool, optional
+        Pool for temporary allocations.
+    """
+    cdef:
+        shared_ptr[COutputStream] stream
+        CCSVWriteOptions c_write_options
+        CMemoryPool* c_memory_pool
+        CRecordBatch* batch
+        CTable* table
+    _get_write_options(write_options, &c_write_options)
+
+    get_writer(output_file, &stream)
+    c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+    if isinstance(data, RecordBatch):
+        batch = pyarrow_unwrap_batch(data).get()
+        with nogil:
+            check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
+                                  stream.get()))
+    elif isinstance(data, Table):
+        table = pyarrow_unwrap_table(data).get()
+        with nogil:
+            check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
+                                  stream.get()))
+    else:
+        raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py
index b116ea1..fc1dcaf 100644
--- a/python/pyarrow/csv.py
+++ b/python/pyarrow/csv.py
@@ -18,4 +18,5 @@
 
 from pyarrow._csv import (  # noqa
     ReadOptions, ParseOptions, ConvertOptions, ISO8601,
-    open_csv, read_csv, CSVStreamingReader)
+    open_csv, read_csv, CSVStreamingReader, write_csv,
+    WriteOptions)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ba3c3ad..a4f6f18 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1618,6 +1618,13 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
         @staticmethod
         CCSVReadOptions Defaults()
 
+    cdef cppclass CCSVWriteOptions" arrow::csv::WriteOptions":
+        c_bool include_header
+        int32_t batch_size
+
+        @staticmethod
+        CCSVWriteOptions Defaults()
+
     cdef cppclass CCSVReader" arrow::csv::TableReader":
         @staticmethod
         CResult[shared_ptr[CCSVReader]] Make(
@@ -1633,6 +1640,11 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
             CMemoryPool*, shared_ptr[CInputStream],
             CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions)
 
+    cdef CStatus WriteCSV(
+        CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*)
+    cdef CStatus WriteCSV(
+        CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*)
+
 
 cdef extern from "arrow/json/options.h" nogil:
 
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 462fe01..5ca31ae 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -36,7 +36,8 @@ import numpy as np
 
 import pyarrow as pa
 from pyarrow.csv import (
-    open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601)
+    open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601,
+    write_csv, WriteOptions)
 
 
 def generate_col_names():
@@ -203,6 +204,21 @@ def test_convert_options():
     assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d']
 
 
+def test_write_options():
+    cls = WriteOptions
+    opts = cls()
+
+    check_options_class(
+        cls, include_header=[True, False])
+
+    assert opts.batch_size > 0
+    opts.batch_size = 12345
+    assert opts.batch_size == 12345
+
+    opts = cls(batch_size=9876)
+    assert opts.batch_size == 9876
+
+
 class BaseTestCSVRead:
 
     def read_bytes(self, b, **kwargs):
@@ -1257,3 +1273,22 @@ def test_read_csv_does_not_close_passed_file_handles():
     buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6")
     read_csv(buf)
     assert not buf.closed
+
+
+def test_write_read_round_trip():
+    t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"])
+    record_batch = t.to_batches(max_chunksize=4)[0]
+    for data in [t, record_batch]:
+        # Test with header
+        buf = io.BytesIO()
+        write_csv(data, buf, WriteOptions(include_header=True))
+        buf.seek(0)
+        assert t == read_csv(buf)
+
+        # Test without header
+        buf = io.BytesIO()
+        write_csv(data, buf, WriteOptions(include_header=False))
+        buf.seek(0)
+
+        read_options = ReadOptions(column_names=t.column_names)
+        assert t == read_csv(buf, read_options=read_options)