You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/02/25 19:16:44 UTC
[arrow] branch master updated: ARROW-2229: [C++][Python] Add
WriteCsv functionality.
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 9a9baf6 ARROW-2229: [C++][Python] Add WriteCsv functionality.
9a9baf6 is described below
commit 9a9baf6824db91be2c0913367d4b151d9390a4e6
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Thu Feb 25 20:15:54 2021 +0100
ARROW-2229: [C++][Python] Add WriteCsv functionality.
This offers possibly performance naive CSV writer with
limited options to keep the initial PR down.
Obvious potential improvements to this approach
are:
- Smarter casts for dictionaries
- Arena allocation for intermediate cast results
The implementation also means that for all primitive type
support we might have to fill in gaps in our cast function.
Closes #9504 from emkornfield/csv
Lead-authored-by: Micah Kornfield <em...@gmail.com>
Co-authored-by: emkornfield <mi...@google.com>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: Micah Kornfield <mi...@google.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/CMakeLists.txt | 3 +
cpp/src/arrow/csv/CMakeLists.txt | 22 +-
cpp/src/arrow/csv/api.h | 5 +
cpp/src/arrow/csv/options.cc | 1 +
cpp/src/arrow/csv/options.h | 15 ++
cpp/src/arrow/csv/writer.cc | 437 +++++++++++++++++++++++++++++++++++
cpp/src/arrow/csv/writer.h | 47 ++++
cpp/src/arrow/csv/writer_test.cc | 129 +++++++++++
cpp/src/arrow/ipc/json_simple.cc | 6 +-
cpp/src/arrow/util/config.h.cmake | 2 +
cpp/src/arrow/util/iterator.h | 3 +-
python/pyarrow/_csv.pyx | 103 ++++++++-
python/pyarrow/csv.py | 3 +-
python/pyarrow/includes/libarrow.pxd | 12 +
python/pyarrow/tests/test_csv.py | 37 ++-
15 files changed, 811 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 382a851..abd5428 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -349,6 +349,9 @@ if(ARROW_CSV)
csv/options.cc
csv/parser.cc
csv/reader.cc)
+ if(ARROW_COMPUTE)
+ list(APPEND ARROW_SRCS csv/writer.cc)
+ endif()
list(APPEND ARROW_TESTING_SRCS csv/test_common.cc)
endif()
diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt
index 2766cfd..561faf1 100644
--- a/cpp/src/arrow/csv/CMakeLists.txt
+++ b/cpp/src/arrow/csv/CMakeLists.txt
@@ -15,14 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-add_arrow_test(csv-test
- SOURCES
- chunker_test.cc
- column_builder_test.cc
- column_decoder_test.cc
- converter_test.cc
- parser_test.cc
- reader_test.cc)
+set(CSV_TEST_SRCS
+ chunker_test.cc
+ column_builder_test.cc
+ column_decoder_test.cc
+ converter_test.cc
+ parser_test.cc
+ reader_test.cc)
+
+# Writer depends on compute's cast functionality
+if(ARROW_COMPUTE)
+ list(APPEND CSV_TEST_SRCS writer_test.cc)
+endif()
+
+add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS})
add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv")
add_arrow_benchmark(parser_benchmark PREFIX "arrow-csv")
diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h
index df88843..7bf3931 100644
--- a/cpp/src/arrow/csv/api.h
+++ b/cpp/src/arrow/csv/api.h
@@ -19,3 +19,8 @@
#include "arrow/csv/options.h"
#include "arrow/csv/reader.h"
+
+// The writer depends on compute module for casting.
+#ifdef ARROW_COMPUTE
+#include "arrow/csv/writer.h"
+#endif
diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc
index b6f1346..a515abf 100644
--- a/cpp/src/arrow/csv/options.cc
+++ b/cpp/src/arrow/csv/options.cc
@@ -34,6 +34,7 @@ ConvertOptions ConvertOptions::Defaults() {
}
ReadOptions ReadOptions::Defaults() { return ReadOptions(); }
+WriteOptions WriteOptions::Defaults() { return WriteOptions(); }
} // namespace csv
} // namespace arrow
diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h
index 82153ed..5c912e7 100644
--- a/cpp/src/arrow/csv/options.h
+++ b/cpp/src/arrow/csv/options.h
@@ -137,5 +137,20 @@ struct ARROW_EXPORT ReadOptions {
static ReadOptions Defaults();
};
+/// Experimental
+struct ARROW_EXPORT WriteOptions {
+ /// Whether to write an initial header line with column names
+ bool include_header = true;
+
+ /// \brief Maximum number of rows processed at a time
+ ///
+ /// The CSV writer converts and writes data in batches of N rows.
+ /// This number can impact performance.
+ int32_t batch_size = 1024;
+
+ /// Create write options with default values
+ static WriteOptions Defaults();
+};
+
} // namespace csv
} // namespace arrow
diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc
new file mode 100644
index 0000000..ddd59b4
--- /dev/null
+++ b/cpp/src/arrow/csv/writer.cc
@@ -0,0 +1,437 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/csv/writer.h"
+#include "arrow/array.h"
+#include "arrow/compute/cast.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/result_internal.h"
+#include "arrow/stl_allocator.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/make_unique.h"
+
+#include "arrow/visitor_inline.h"
+
+namespace arrow {
+namespace csv {
+// This implementation is intentionally light on configurability to minimize the size of
+// the initial PR. Aditional features can be added as there is demand and interest to
+// implement them.
+//
+// The algorithm used here at a high level is to break RecordBatches/Tables into slices
+// and convert each slice independently. A slice is then converted to CSV by first
+// scanning each column to determine the size of its contents when rendered as a string in
+// CSV. For non-string types this requires casting the value to string (which is cached).
+// This data is used to understand the precise length of each row and a single allocation
+// for the final CSV data buffer. Once the final size is known each column is then
+// iterated over again to place its contents into the CSV data buffer. The rationale for
+// choosing this approach is it allows for reuse of the cast functionality in the compute
+// module and inline data visiting functionality in the core library. A performance
+// comparison has not been done using a naive single-pass approach. This approach might
+// still be competitive due to reduction in the number of per row branches necessary with
+// a single pass approach. Profiling would likely yield further opportunities for
+// optimization with this approach.
+
+namespace {
+
+struct SliceIteratorFunctor {
+ Result<std::shared_ptr<RecordBatch>> Next() {
+ if (current_offset < batch->num_rows()) {
+ std::shared_ptr<RecordBatch> next = batch->Slice(current_offset, slice_size);
+ current_offset += slice_size;
+ return next;
+ }
+ return IterationTraits<std::shared_ptr<RecordBatch>>::End();
+ }
+ const RecordBatch* const batch;
+ const int64_t slice_size;
+ int64_t current_offset;
+};
+
+RecordBatchIterator RecordBatchSliceIterator(const RecordBatch& batch,
+ int64_t slice_size) {
+ SliceIteratorFunctor functor = {&batch, slice_size, /*offset=*/static_cast<int64_t>(0)};
+ return RecordBatchIterator(std::move(functor));
+}
+
+// Counts the number of characters that need escaping in s.
+int64_t CountEscapes(util::string_view s) {
+ return static_cast<int64_t>(std::count(s.begin(), s.end(), '"'));
+}
+
+// Matching quote pair character length.
+constexpr int64_t kQuoteCount = 2;
+constexpr int64_t kQuoteDelimiterCount = kQuoteCount + /*end_char*/ 1;
+
+// Interface for generating CSV data per column.
+// The intended usage is to iteratively call UpdateRowLengths for a column and
+// then PopulateColumns. PopulateColumns must be called in the reverse order of the
+// populators (it populates data backwards).
+class ColumnPopulator {
+ public:
+ ColumnPopulator(MemoryPool* pool, char end_char) : end_char_(end_char), pool_(pool) {}
+
+ virtual ~ColumnPopulator() = default;
+
+ // Adds the number of characters each entry in data will add to to elements
+ // in row_lengths.
+ Status UpdateRowLengths(const Array& data, int32_t* row_lengths) {
+ compute::ExecContext ctx(pool_);
+ // Populators are intented to be applied to reasonably small data. In most cases
+ // threading overhead would not be justified.
+ ctx.set_use_threads(false);
+ ASSIGN_OR_RAISE(
+ std::shared_ptr<Array> casted,
+ compute::Cast(data, /*to_type=*/utf8(), compute::CastOptions(), &ctx));
+ casted_array_ = internal::checked_pointer_cast<StringArray>(casted);
+ return UpdateRowLengths(row_lengths);
+ }
+
+ // Places string data onto each row in output and updates the corresponding row
+ // row pointers in preparation for calls to other (preceding) ColumnPopulators.
+ // Args:
+ // output: character buffer to write to.
+ // offsets: an array of end of row column within the the output buffer (values are
+ // one past the end of the position to write to).
+ virtual void PopulateColumns(char* output, int32_t* offsets) const = 0;
+
+ protected:
+ virtual Status UpdateRowLengths(int32_t* row_lengths) = 0;
+ std::shared_ptr<StringArray> casted_array_;
+ const char end_char_;
+
+ private:
+ MemoryPool* const pool_;
+};
+
+// Copies the contents of to out properly escaping any necessary characters.
+// Returns the position prior to last copied character (out_end is decremented).
+char* EscapeReverse(arrow::util::string_view s, char* out_end) {
+ for (const char* val = s.data() + s.length() - 1; val >= s.data(); val--, out_end--) {
+ if (*val == '"') {
+ *out_end = *val;
+ out_end--;
+ }
+ *out_end = *val;
+ }
+ return out_end;
+}
+
+// Populator for non-string types. This populator relies on compute Cast functionality to
+// String if it doesn't exist it will be an error. it also assumes the resulting string
+// from a cast does not require quoting or escaping.
+class UnquotedColumnPopulator : public ColumnPopulator {
+ public:
+ explicit UnquotedColumnPopulator(MemoryPool* memory_pool, char end_char)
+ : ColumnPopulator(memory_pool, end_char) {}
+
+ Status UpdateRowLengths(int32_t* row_lengths) override {
+ for (int x = 0; x < casted_array_->length(); x++) {
+ row_lengths[x] += casted_array_->value_length(x);
+ }
+ return Status::OK();
+ }
+
+ void PopulateColumns(char* output, int32_t* offsets) const override {
+ VisitArrayDataInline<StringType>(
+ *casted_array_->data(),
+ [&](arrow::util::string_view s) {
+ int64_t next_column_offset = s.length() + /*end_char*/ 1;
+ memcpy((output + *offsets - next_column_offset), s.data(), s.length());
+ *(output + *offsets - 1) = end_char_;
+ *offsets -= static_cast<int32_t>(next_column_offset);
+ offsets++;
+ },
+ [&]() {
+ // Nulls are empty (unquoted) to distinguish with empty string.
+ *(output + *offsets - 1) = end_char_;
+ *offsets -= 1;
+ offsets++;
+ });
+ }
+};
+
+// Strings need special handling to ensure they are escaped properly.
+// This class handles escaping assuming that all strings will be quoted
+// and that the only character within the string that needs to escaped is
+// a quote character (") and escaping is done my adding another quote.
+class QuotedColumnPopulator : public ColumnPopulator {
+ public:
+ QuotedColumnPopulator(MemoryPool* pool, char end_char)
+ : ColumnPopulator(pool, end_char) {}
+
+ Status UpdateRowLengths(int32_t* row_lengths) override {
+ const StringArray& input = *casted_array_;
+ int row_number = 0;
+ row_needs_escaping_.resize(casted_array_->length());
+ VisitArrayDataInline<StringType>(
+ *input.data(),
+ [&](arrow::util::string_view s) {
+ int64_t escaped_count = CountEscapes(s);
+ // TODO: Maybe use 64 bit row lengths or safe cast?
+ row_needs_escaping_[row_number] = escaped_count > 0;
+ row_lengths[row_number] += static_cast<int32_t>(s.length()) +
+ static_cast<int32_t>(escaped_count + kQuoteCount);
+ row_number++;
+ },
+ [&]() {
+ row_needs_escaping_[row_number] = false;
+ row_number++;
+ });
+ return Status::OK();
+ }
+
+ void PopulateColumns(char* output, int32_t* offsets) const override {
+ auto needs_escaping = row_needs_escaping_.begin();
+ VisitArrayDataInline<StringType>(
+ *(casted_array_->data()),
+ [&](arrow::util::string_view s) {
+ // still needs string content length to be added
+ char* row_end = output + *offsets;
+ int32_t next_column_offset = 0;
+ if (!*needs_escaping) {
+ next_column_offset = static_cast<int32_t>(s.length() + kQuoteDelimiterCount);
+ memcpy(row_end - next_column_offset + /*quote_offset=*/1, s.data(),
+ s.length());
+ } else {
+ // Adjust row_end by 3: 1 quote char, 1 end char and 1 to position at the
+ // first position to write to.
+ next_column_offset =
+ static_cast<int32_t>(row_end - EscapeReverse(s, row_end - 3));
+ }
+ *(row_end - next_column_offset) = '"';
+ *(row_end - 2) = '"';
+ *(row_end - 1) = end_char_;
+ *offsets -= next_column_offset;
+ offsets++;
+ needs_escaping++;
+ },
+ [&]() {
+ // Nulls are empty (unquoted) to distinguish with empty string.
+ *(output + *offsets - 1) = end_char_;
+ *offsets -= 1;
+ offsets++;
+ needs_escaping++;
+ });
+ }
+
+ private:
+ // Older version of GCC don't support custom allocators
+ // at some point we should change this to use memory_pool
+ // backed allocator.
+ std::vector<bool> row_needs_escaping_;
+};
+
+struct PopulatorFactory {
+ template <typename TypeClass>
+ enable_if_t<is_base_binary_type<TypeClass>::value ||
+ std::is_same<FixedSizeBinaryType, TypeClass>::value,
+ Status>
+ Visit(const TypeClass& type) {
+ populator = new QuotedColumnPopulator(pool, end_char);
+ return Status::OK();
+ }
+
+ template <typename TypeClass>
+ enable_if_dictionary<TypeClass, Status> Visit(const TypeClass& type) {
+ return VisitTypeInline(*type.value_type(), this);
+ }
+
+ template <typename TypeClass>
+ enable_if_t<is_nested_type<TypeClass>::value || is_extension_type<TypeClass>::value,
+ Status>
+ Visit(const TypeClass& type) {
+ return Status::Invalid("Unsupported Type:", type.ToString());
+ }
+
+ template <typename TypeClass>
+ enable_if_t<is_primitive_ctype<TypeClass>::value || is_decimal_type<TypeClass>::value ||
+ is_null_type<TypeClass>::value || is_temporal_type<TypeClass>::value,
+ Status>
+ Visit(const TypeClass& type) {
+ populator = new UnquotedColumnPopulator(pool, end_char);
+ return Status::OK();
+ }
+
+ char end_char;
+ MemoryPool* pool;
+ ColumnPopulator* populator;
+};
+
+Result<std::unique_ptr<ColumnPopulator>> MakePopulator(const Field& field, char end_char,
+ MemoryPool* pool) {
+ PopulatorFactory factory{end_char, pool, nullptr};
+ RETURN_NOT_OK(VisitTypeInline(*field.type(), &factory));
+ return std::unique_ptr<ColumnPopulator>(factory.populator);
+}
+
+class CSVConverter {
+ public:
+ static Result<std::unique_ptr<CSVConverter>> Make(std::shared_ptr<Schema> schema,
+ MemoryPool* pool) {
+ std::vector<std::unique_ptr<ColumnPopulator>> populators(schema->num_fields());
+ for (int col = 0; col < schema->num_fields(); col++) {
+ char end_char = col < schema->num_fields() - 1 ? ',' : '\n';
+ ASSIGN_OR_RAISE(populators[col],
+ MakePopulator(*schema->field(col), end_char, pool));
+ }
+ return std::unique_ptr<CSVConverter>(
+ new CSVConverter(std::move(schema), std::move(populators), pool));
+ }
+
+ Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
+ io::OutputStream* out) {
+ RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+ RecordBatchIterator iterator = RecordBatchSliceIterator(batch, options.batch_size);
+ for (auto maybe_slice : iterator) {
+ ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
+ RETURN_NOT_OK(TranslateMinimalBatch(*slice));
+ RETURN_NOT_OK(out->Write(data_buffer_));
+ }
+ return Status::OK();
+ }
+
+ Status WriteCSV(const Table& table, const WriteOptions& options,
+ io::OutputStream* out) {
+ TableBatchReader reader(table);
+ reader.set_chunksize(options.batch_size);
+ RETURN_NOT_OK(PrepareForContentsWrite(options, out));
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader.ReadNext(&batch));
+ while (batch != nullptr) {
+ RETURN_NOT_OK(TranslateMinimalBatch(*batch));
+ RETURN_NOT_OK(out->Write(data_buffer_));
+ RETURN_NOT_OK(reader.ReadNext(&batch));
+ }
+
+ return Status::OK();
+ }
+
+ private:
+ CSVConverter(std::shared_ptr<Schema> schema,
+ std::vector<std::unique_ptr<ColumnPopulator>> populators, MemoryPool* pool)
+ : column_populators_(std::move(populators)),
+ offsets_(0, 0, ::arrow::stl::allocator<char*>(pool)),
+ schema_(std::move(schema)),
+ pool_(pool) {}
+
+ Status PrepareForContentsWrite(const WriteOptions& options, io::OutputStream* out) {
+ if (data_buffer_ == nullptr) {
+ ASSIGN_OR_RAISE(
+ data_buffer_,
+ AllocateResizableBuffer(
+ options.batch_size * schema_->num_fields() * kColumnSizeGuess, pool_));
+ }
+ if (options.include_header) {
+ RETURN_NOT_OK(WriteHeader(out));
+ }
+ return Status::OK();
+ }
+
+ int64_t CalculateHeaderSize() const {
+ int64_t header_length = 0;
+ for (int col = 0; col < schema_->num_fields(); col++) {
+ const std::string& col_name = schema_->field(col)->name();
+ header_length += col_name.size();
+ header_length += CountEscapes(col_name);
+ }
+ return header_length + (kQuoteDelimiterCount * schema_->num_fields());
+ }
+
+ Status WriteHeader(io::OutputStream* out) {
+ RETURN_NOT_OK(data_buffer_->Resize(CalculateHeaderSize(), /*shrink_to_fit=*/false));
+ char* next =
+ reinterpret_cast<char*>(data_buffer_->mutable_data() + data_buffer_->size() - 1);
+ for (int col = schema_->num_fields() - 1; col >= 0; col--) {
+ *next-- = ',';
+ *next-- = '"';
+ next = EscapeReverse(schema_->field(col)->name(), next);
+ *next-- = '"';
+ }
+ *(data_buffer_->mutable_data() + data_buffer_->size() - 1) = '\n';
+ DCHECK_EQ(reinterpret_cast<uint8_t*>(next + 1), data_buffer_->data());
+ return out->Write(data_buffer_);
+ }
+
+ Status TranslateMinimalBatch(const RecordBatch& batch) {
+ if (batch.num_rows() == 0) {
+ return Status::OK();
+ }
+ offsets_.resize(batch.num_rows());
+ std::fill(offsets_.begin(), offsets_.end(), 0);
+
+ // Calculate relative offsets for each row (excluding delimiters)
+ for (int32_t col = 0; col < static_cast<int32_t>(column_populators_.size()); col++) {
+ RETURN_NOT_OK(
+ column_populators_[col]->UpdateRowLengths(*batch.column(col), offsets_.data()));
+ }
+ // Calculate cumulalative offsets for each row (including delimiters).
+ offsets_[0] += batch.num_columns();
+ for (int64_t row = 1; row < batch.num_rows(); row++) {
+ offsets_[row] += offsets_[row - 1] + /*delimiter lengths*/ batch.num_columns();
+ }
+ // Resize the target buffer to required size. We assume batch to batch sizes
+ // should be pretty close so don't shrink the buffer to avoid allocation churn.
+ RETURN_NOT_OK(data_buffer_->Resize(offsets_.back(), /*shrink_to_fit=*/false));
+
+ // Use the offsets to populate contents.
+ for (auto populator = column_populators_.rbegin();
+ populator != column_populators_.rend(); populator++) {
+ (*populator)
+ ->PopulateColumns(reinterpret_cast<char*>(data_buffer_->mutable_data()),
+ offsets_.data());
+ }
+ DCHECK_EQ(0, offsets_[0]);
+ return Status::OK();
+ }
+
+ static constexpr int64_t kColumnSizeGuess = 8;
+ std::vector<std::unique_ptr<ColumnPopulator>> column_populators_;
+ std::vector<int32_t, arrow::stl::allocator<int32_t>> offsets_;
+ std::shared_ptr<ResizableBuffer> data_buffer_;
+ const std::shared_ptr<Schema> schema_;
+ MemoryPool* pool_;
+};
+
+} // namespace
+
+Status WriteCSV(const Table& table, const WriteOptions& options, MemoryPool* pool,
+ arrow::io::OutputStream* output) {
+ if (pool == nullptr) {
+ pool = default_memory_pool();
+ }
+ ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
+ CSVConverter::Make(table.schema(), pool));
+ return converter->WriteCSV(table, options, output);
+}
+
+Status WriteCSV(const RecordBatch& batch, const WriteOptions& options, MemoryPool* pool,
+ arrow::io::OutputStream* output) {
+ if (pool == nullptr) {
+ pool = default_memory_pool();
+ }
+
+ ASSIGN_OR_RAISE(std::unique_ptr<CSVConverter> converter,
+ CSVConverter::Make(batch.schema(), pool));
+ return converter->WriteCSV(batch, options, output);
+}
+
+} // namespace csv
+} // namespace arrow
diff --git a/cpp/src/arrow/csv/writer.h b/cpp/src/arrow/csv/writer.h
new file mode 100644
index 0000000..c009d78
--- /dev/null
+++ b/cpp/src/arrow/csv/writer.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/csv/options.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/record_batch.h"
+#include "arrow/table.h"
+
+namespace arrow {
+namespace csv {
+// Functionality for converting Arrow data to Comma separated value text.
+// This library supports all primitive types that can be cast to a StringArrays.
+// It applies to following formatting rules:
+// - For non-binary types no quotes surround values. Nulls are represented as the empty
+// string.
+// - For binary types all non-null data is quoted (and quotes within data are escaped
+// with an additional quote).
+// Null values are empty and unquoted.
+// - LF (\n) is always used as a line ending.
+
+/// \brief Converts table to a CSV and writes the results to output.
+/// Experimental
+ARROW_EXPORT Status WriteCSV(const Table& table, const WriteOptions& options,
+ MemoryPool* pool, arrow::io::OutputStream* output);
+/// \brief Converts batch to CSV and writes the results to output.
+/// Experimental
+ARROW_EXPORT Status WriteCSV(const RecordBatch& batch, const WriteOptions& options,
+ MemoryPool* pool, arrow::io::OutputStream* output);
+
+} // namespace csv
+} // namespace arrow
diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc
new file mode 100644
index 0000000..dc59fef
--- /dev/null
+++ b/cpp/src/arrow/csv/writer_test.cc
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <memory>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/writer.h"
+#include "arrow/io/memory.h"
+#include "arrow/record_batch.h"
+#include "arrow/result_internal.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace csv {
+
+struct TestParams {
+ std::shared_ptr<RecordBatch> record_batch;
+ WriteOptions options;
+ std::string expected_output;
+};
+
+WriteOptions DefaultTestOptions(bool include_header) {
+ WriteOptions options;
+ options.batch_size = 5;
+ options.include_header = include_header;
+ return options;
+}
+
+std::vector<TestParams> GenerateTestCases() {
+ auto abc_schema = schema({
+ {field("a", uint64())},
+ {field("b\"", utf8())},
+ {field("c ", int32())},
+ });
+ auto empty_batch =
+ RecordBatch::Make(abc_schema, /*num_rows=*/0,
+ {
+ ArrayFromJSON(abc_schema->field(0)->type(), "[]"),
+ ArrayFromJSON(abc_schema->field(1)->type(), "[]"),
+ ArrayFromJSON(abc_schema->field(2)->type(), "[]"),
+ });
+ auto populated_batch = RecordBatchFromJSON(abc_schema, R"([{"a": 1, "c ": -1},
+ { "a": 1, "b\"": "abc\"efg", "c ": 2324},
+ { "b\"": "abcd", "c ": 5467},
+ { },
+ { "a": 546, "b\"": "", "c ": 517 },
+ { "a": 124, "b\"": "a\"\"b\"" }])");
+ std::string expected_without_header = std::string("1,,-1") + "\n" + // line 1
+ +R"(1,"abc""efg",2324)" + "\n" + // line 2
+ R"(,"abcd",5467)" + "\n" + // line 3
+ R"(,,)" + "\n" + // line 4
+ R"(546,"",517)" + "\n" + // line 5
+ R"(124,"a""""b""",)" + "\n"; // line 6
+ std::string expected_header = std::string(R"("a","b""","c ")") + "\n";
+
+ return std::vector<TestParams>{
+ {empty_batch, DefaultTestOptions(/*header=*/false), ""},
+ {empty_batch, DefaultTestOptions(/*header=*/true), expected_header},
+ {populated_batch, DefaultTestOptions(/*header=*/false), expected_without_header},
+ {populated_batch, DefaultTestOptions(/*header=*/true),
+ expected_header + expected_without_header}};
+}
+
+class TestWriteCSV : public ::testing::TestWithParam<TestParams> {
+ protected:
+ template <typename Data>
+ Result<std::string> ToCsvString(const Data& data, const WriteOptions& options) {
+ std::shared_ptr<io::BufferOutputStream> out;
+ ASSIGN_OR_RAISE(out, io::BufferOutputStream::Create());
+
+ RETURN_NOT_OK(WriteCSV(data, options, default_memory_pool(), out.get()));
+ ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer, out->Finish());
+ return std::string(reinterpret_cast<const char*>(buffer->data()), buffer->size());
+ }
+};
+
+TEST_P(TestWriteCSV, TestWrite) {
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<io::BufferOutputStream> out,
+ io::BufferOutputStream::Create());
+ WriteOptions options = GetParam().options;
+ std::string csv;
+ ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options));
+ EXPECT_EQ(csv, GetParam().expected_output);
+
+ // Batch size shouldn't matter.
+ options.batch_size /= 2;
+ ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*GetParam().record_batch, options));
+ EXPECT_EQ(csv, GetParam().expected_output);
+
+ // Table and Record batch should work identically.
+ ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> table,
+ Table::FromRecordBatches({GetParam().record_batch}));
+ ASSERT_OK_AND_ASSIGN(csv, ToCsvString(*table, options));
+ EXPECT_EQ(csv, GetParam().expected_output);
+}
+
+INSTANTIATE_TEST_SUITE_P(MultiColumnWriteCSVTest, TestWriteCSV,
+ ::testing::ValuesIn(GenerateTestCases()));
+
+INSTANTIATE_TEST_SUITE_P(
+ SingleColumnWriteCSVTest, TestWriteCSV,
+ ::testing::Values(TestParams{
+ RecordBatchFromJSON(schema({field("int64", int64())}),
+ R"([{ "int64": 9999}, {}, { "int64": -15}])"),
+ WriteOptions(),
+ R"("int64")"
+ "\n9999\n\n-15\n"}));
+
+} // namespace csv
+} // namespace arrow
diff --git a/cpp/src/arrow/ipc/json_simple.cc b/cpp/src/arrow/ipc/json_simple.cc
index fba8194..caf6fd0 100644
--- a/cpp/src/arrow/ipc/json_simple.cc
+++ b/cpp/src/arrow/ipc/json_simple.cc
@@ -43,6 +43,7 @@
#include <rapidjson/error/en.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/reader.h>
+#include <rapidjson/writer.h>
namespace rj = arrow::rapidjson;
@@ -652,8 +653,11 @@ class StructConverter final : public ConcreteConverter<StructConverter> {
}
}
if (remaining > 0) {
+ rj::StringBuffer sb;
+ rj::Writer<rj::StringBuffer> writer(sb);
+ json_obj.Accept(writer);
return Status::Invalid("Unexpected members in JSON object for type ",
- type_->ToString());
+ type_->ToString(), " Object: ", sb.GetString());
}
return builder_->Append();
}
diff --git a/cpp/src/arrow/util/config.h.cmake b/cpp/src/arrow/util/config.h.cmake
index 8f8dea0..be6686f 100644
--- a/cpp/src/arrow/util/config.h.cmake
+++ b/cpp/src/arrow/util/config.h.cmake
@@ -34,6 +34,8 @@
#define ARROW_PACKAGE_KIND "@ARROW_PACKAGE_KIND@"
+#cmakedefine ARROW_COMPUTE
+
#cmakedefine ARROW_S3
#cmakedefine ARROW_USE_NATIVE_INT128
diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h
index 75ccf28..771b209 100644
--- a/cpp/src/arrow/util/iterator.h
+++ b/cpp/src/arrow/util/iterator.h
@@ -64,7 +64,8 @@ template <typename T>
class Iterator : public util::EqualityComparable<Iterator<T>> {
public:
/// \brief Iterator may be constructed from any type which has a member function
- /// with signature Status Next(T*);
+ /// with signature Result<T> Next();
+ /// End of iterator is signalled by returning IteratorTraits<T>::End();
///
/// The argument is moved or copied to the heap and kept in a unique_ptr<void>. Only
/// its destructor and its Next method (which are stored in function pointers) are
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index 4068a0b..f5b8e4d 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -30,10 +30,13 @@ from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
RecordBatchReader, ensure_type,
maybe_unbox_memory_pool, get_input_stream,
- native_transcoding_input_stream,
+ get_writer, native_transcoding_input_stream,
+ pyarrow_unwrap_batch, pyarrow_unwrap_table,
pyarrow_wrap_schema, pyarrow_wrap_table,
- pyarrow_wrap_data_type, pyarrow_unwrap_data_type)
+ pyarrow_wrap_data_type, pyarrow_unwrap_data_type,
+ Table, RecordBatch)
from pyarrow.lib import frombytes, tobytes
+from pyarrow.util import _stringify_path
cdef unsigned char _single_char(s) except 0:
@@ -763,3 +766,99 @@ def open_csv(input_file, read_options=None, parse_options=None,
move(c_convert_options),
maybe_unbox_memory_pool(memory_pool))
return reader
+
+
+cdef class WriteOptions(_Weakrefable):
+ """
+ Options for writing CSV files.
+
+ Parameters
+ ----------
+ include_header : bool, optional (default True)
+ Whether to write an initial header line with column names
+ batch_size : int, optional (default 1024)
+ How many rows to process together when converting and writing
+ CSV data
+ """
+ cdef:
+ CCSVWriteOptions options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, *, include_header=None, batch_size=None):
+ self.options = CCSVWriteOptions.Defaults()
+ if include_header is not None:
+ self.include_header = include_header
+ if batch_size is not None:
+ self.batch_size = batch_size
+
+ @property
+ def include_header(self):
+ """
+ Whether to write an initial header line with column names.
+ """
+ return self.options.include_header
+
+ @include_header.setter
+ def include_header(self, value):
+ self.options.include_header = value
+
+ @property
+ def batch_size(self):
+ """
+ How many rows to process together when converting and writing
+ CSV data.
+ """
+ return self.options.batch_size
+
+ @batch_size.setter
+ def batch_size(self, value):
+ self.options.batch_size = value
+
+
+cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
+ if write_options is None:
+ out[0] = CCSVWriteOptions.Defaults()
+ else:
+ out[0] = write_options.options
+
+
+def write_csv(data, output_file, write_options=None,
+ MemoryPool memory_pool=None):
+ """
+ Write record batch or table to a CSV file.
+
+ Parameters
+ ----------
+ data: pyarrow.RecordBatch or pyarrow.Table
+ The data to write.
+ output_file: string, path, pyarrow.OutputStream or file-like object
+ The location where to write the CSV data.
+ write_options: pyarrow.csv.WriteOptions
+ Options to configure writing the CSV data.
+ memory_pool: MemoryPool, optional
+ Pool for temporary allocations.
+ """
+ cdef:
+ shared_ptr[COutputStream] stream
+ CCSVWriteOptions c_write_options
+ CMemoryPool* c_memory_pool
+ CRecordBatch* batch
+ CTable* table
+ _get_write_options(write_options, &c_write_options)
+
+ get_writer(output_file, &stream)
+ c_memory_pool = maybe_unbox_memory_pool(memory_pool)
+ if isinstance(data, RecordBatch):
+ batch = pyarrow_unwrap_batch(data).get()
+ with nogil:
+ check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool,
+ stream.get()))
+ elif isinstance(data, Table):
+ table = pyarrow_unwrap_table(data).get()
+ with nogil:
+ check_status(WriteCSV(deref(table), c_write_options, c_memory_pool,
+ stream.get()))
+ else:
+ raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")
diff --git a/python/pyarrow/csv.py b/python/pyarrow/csv.py
index b116ea1..fc1dcaf 100644
--- a/python/pyarrow/csv.py
+++ b/python/pyarrow/csv.py
@@ -18,4 +18,5 @@
from pyarrow._csv import ( # noqa
ReadOptions, ParseOptions, ConvertOptions, ISO8601,
- open_csv, read_csv, CSVStreamingReader)
+ open_csv, read_csv, CSVStreamingReader, write_csv,
+ WriteOptions)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index ba3c3ad..a4f6f18 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1618,6 +1618,13 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
@staticmethod
CCSVReadOptions Defaults()
+ cdef cppclass CCSVWriteOptions" arrow::csv::WriteOptions":
+ c_bool include_header
+ int32_t batch_size
+
+ @staticmethod
+ CCSVWriteOptions Defaults()
+
cdef cppclass CCSVReader" arrow::csv::TableReader":
@staticmethod
CResult[shared_ptr[CCSVReader]] Make(
@@ -1633,6 +1640,11 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
CMemoryPool*, shared_ptr[CInputStream],
CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions)
+ cdef CStatus WriteCSV(
+ CTable&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*)
+ cdef CStatus WriteCSV(
+ CRecordBatch&, CCSVWriteOptions& options, CMemoryPool*, COutputStream*)
+
cdef extern from "arrow/json/options.h" nogil:
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index 462fe01..5ca31ae 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -36,7 +36,8 @@ import numpy as np
import pyarrow as pa
from pyarrow.csv import (
- open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601)
+ open_csv, read_csv, ReadOptions, ParseOptions, ConvertOptions, ISO8601,
+ write_csv, WriteOptions)
def generate_col_names():
@@ -203,6 +204,21 @@ def test_convert_options():
assert opts.timestamp_parsers == [ISO8601, '%Y-%m-%d']
+def test_write_options():
+ cls = WriteOptions
+ opts = cls()
+
+ check_options_class(
+ cls, include_header=[True, False])
+
+ assert opts.batch_size > 0
+ opts.batch_size = 12345
+ assert opts.batch_size == 12345
+
+ opts = cls(batch_size=9876)
+ assert opts.batch_size == 9876
+
+
class BaseTestCSVRead:
def read_bytes(self, b, **kwargs):
@@ -1257,3 +1273,22 @@ def test_read_csv_does_not_close_passed_file_handles():
buf = io.BytesIO(b"a,b,c\n1,2,3\n4,5,6")
read_csv(buf)
assert not buf.closed
+
+
+def test_write_read_round_trip():
+ t = pa.Table.from_arrays([[1, 2, 3], ["a", "b", "c"]], ["c1", "c2"])
+ record_batch = t.to_batches(max_chunksize=4)[0]
+ for data in [t, record_batch]:
+ # Test with header
+ buf = io.BytesIO()
+ write_csv(data, buf, WriteOptions(include_header=True))
+ buf.seek(0)
+ assert t == read_csv(buf)
+
+ # Test without header
+ buf = io.BytesIO()
+ write_csv(data, buf, WriteOptions(include_header=False))
+ buf.seek(0)
+
+ read_options = ReadOptions(column_names=t.column_names)
+ assert t == read_csv(buf, read_options=read_options)