You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/09 01:22:01 UTC
[arrow] branch master updated: ARROW-6152: [C++][Parquet] Add
parquet::ColumnWriter::WriteArrow method, refactor
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b4c1763 ARROW-6152: [C++][Parquet] Add parquet::ColumnWriter::WriteArrow method, refactor
b4c1763 is described below
commit b4c1763024f6aabb562fb0b83643a71a6e8be5a1
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 8 20:21:42 2019 -0500
ARROW-6152: [C++][Parquet] Add parquet::ColumnWriter::WriteArrow method, refactor
* Write Arrow arrays directly to ColumnWriter, to allow internal optimizations and other features, like direct DictionaryArray writes
* Refactor and streamline implementation for maintainability and readability
* Move Arrow reader/writer properties to parquet/properties.h
* Some minor miscellaneous code reorganization to help
Functionally the library is unchanged
Closes #5036 from wesm/ARROW-6152 and squashes the following commits:
b2ddcb3f3 <Wes McKinney> Regenerate arrowExports.cpp
7a227f16f <Wes McKinney> Try to fix CI jobs, fix R use of deprecated API
7bc795174 <Wes McKinney> Fix Python compilation
0b0aaef82 <Wes McKinney> Simplify
3331ff797 <Wes McKinney> IWYU
5e5f00e8d <Wes McKinney> Restore functors, test suite passing again
f832f8e19 <Wes McKinney> Compiles again, but lost a couple functors
1dc38e02e <Wes McKinney> Checkpoint
7bc4f9b15 <Wes McKinney> Refactoring
31e180e63 <Wes McKinney> Some simplification, investigation
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 5 +-
cpp/src/parquet/arrow/reader.cc | 10 -
cpp/src/parquet/arrow/reader.h | 50 +-
cpp/src/parquet/arrow/reader_internal.cc | 4 +-
cpp/src/parquet/arrow/reader_internal.h | 17 +-
cpp/src/parquet/arrow/schema.cc | 79 +-
cpp/src/parquet/arrow/schema.h | 10 +-
cpp/src/parquet/arrow/writer.cc | 841 +++-------------------
cpp/src/parquet/arrow/writer.h | 174 +----
cpp/src/parquet/column_writer.cc | 635 ++++++++++++++--
cpp/src/parquet/column_writer.h | 62 ++
cpp/src/parquet/properties.cc | 11 +
cpp/src/parquet/properties.h | 179 ++++-
cpp/src/parquet/schema.cc | 1 -
cpp/src/parquet/types.cc | 76 ++
cpp/src/parquet/types.h | 6 +
dev/lint/run_iwyu.sh | 1 +
python/pyarrow/_parquet.pxd | 40 +-
r/src/arrowExports.cpp | 22 +-
r/src/parquet.cpp | 19 +-
20 files changed, 1069 insertions(+), 1173 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index bb39d56..ee4ddcb 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -44,6 +44,7 @@
#include "parquet/arrow/schema.h"
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"
+#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/test-util.h"
@@ -459,7 +460,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
case ::arrow::Type::DECIMAL: {
const auto& decimal_type =
static_cast<const ::arrow::Decimal128Type&>(values_type);
- byte_width = DecimalSize(decimal_type.precision());
+ byte_width = internal::DecimalSize(decimal_type.precision());
} break;
default:
break;
@@ -470,7 +471,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
break;
case ::arrow::Type::DECIMAL: {
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
- byte_width = DecimalSize(decimal_type.precision());
+ byte_width = internal::DecimalSize(decimal_type.precision());
} break;
default:
break;
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 931fd19..3d1ad76 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,6 @@
#include <algorithm>
#include <cstring>
-#include <functional>
#include <future>
#include <numeric>
#include <utility>
@@ -33,13 +32,11 @@
#include "arrow/util/thread-pool.h"
#include "parquet/arrow/reader_internal.h"
-#include "parquet/arrow/schema.h"
#include "parquet/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/properties.h"
-#include "parquet/schema-internal.h"
#include "parquet/schema.h"
using arrow::Array;
@@ -76,8 +73,6 @@ using parquet::internal::RecordReader;
namespace parquet {
namespace arrow {
-class ColumnChunkReaderImpl;
-
class ColumnReaderImpl : public ColumnReader {
public:
enum ReaderType { PRIMITIVE, LIST, STRUCT };
@@ -91,11 +86,6 @@ class ColumnReaderImpl : public ColumnReader {
virtual ReaderType type() const = 0;
};
-ArrowReaderProperties default_arrow_reader_properties() {
- static ArrowReaderProperties default_reader_props;
- return default_reader_props;
-}
-
// ----------------------------------------------------------------------
// FileReaderImpl forward declaration
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index d0ce68a..1492e2e 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -20,9 +20,9 @@
#include <cstdint>
#include <memory>
-#include <unordered_set>
#include <vector>
+#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
@@ -39,8 +39,6 @@ class Table;
namespace parquet {
class FileMetaData;
-class ParquetFileReader;
-class ReaderProperties;
class SchemaDescriptor;
namespace arrow {
@@ -49,52 +47,6 @@ class ColumnChunkReader;
class ColumnReader;
class RowGroupReader;
-static constexpr bool DEFAULT_USE_THREADS = false;
-
-// Default number of rows to read when using ::arrow::RecordBatchReader
-static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024;
-
-/// EXPERIMENTAL: Properties for configuring FileReader behavior.
-class PARQUET_EXPORT ArrowReaderProperties {
- public:
- explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS)
- : use_threads_(use_threads),
- read_dict_indices_(),
- batch_size_(DEFAULT_BATCH_SIZE) {}
-
- void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
-
- bool use_threads() const { return use_threads_; }
-
- void set_read_dictionary(int column_index, bool read_dict) {
- if (read_dict) {
- read_dict_indices_.insert(column_index);
- } else {
- read_dict_indices_.erase(column_index);
- }
- }
- bool read_dictionary(int column_index) const {
- if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
- return true;
- } else {
- return false;
- }
- }
-
- void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
-
- int64_t batch_size() const { return batch_size_; }
-
- private:
- bool use_threads_;
- std::unordered_set<int> read_dict_indices_;
- int64_t batch_size_;
-};
-
-/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
-PARQUET_EXPORT
-ArrowReaderProperties default_arrow_reader_properties();
-
// Arrow read adapter class for deserializing Parquet files as Arrow row
// batches.
//
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index f41e875..bfc4094 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -24,7 +24,6 @@
#include <memory>
#include <string>
#include <type_traits>
-#include <utility>
#include <vector>
#include <boost/algorithm/string/predicate.hpp>
@@ -32,8 +31,10 @@
#include "arrow/array.h"
#include "arrow/builder.h"
#include "arrow/compute/kernel.h"
+#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
+#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/int-util.h"
#include "arrow/util/logging.h"
@@ -42,6 +43,7 @@
#include "parquet/arrow/reader.h"
#include "parquet/column_reader.h"
#include "parquet/platform.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index cbf44ec..4568e42 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -17,17 +17,19 @@
#pragma once
+#include <cstdint>
#include <deque>
+#include <functional>
#include <memory>
#include <unordered_map>
#include <unordered_set>
+#include <utility>
#include <vector>
-#include "arrow/status.h"
-
#include "parquet/column_reader.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
+#include "parquet/platform.h"
#include "parquet/schema.h"
namespace arrow {
@@ -36,8 +38,6 @@ class Array;
class ChunkedArray;
class DataType;
class Field;
-class MemoryPool;
-class Schema;
} // namespace arrow
@@ -45,17 +45,10 @@ using arrow::Status;
namespace parquet {
-class ColumnDescriptor;
-
-namespace internal {
-
-class RecordReader;
-
-} // namespace internal
+class ArrowReaderProperties;
namespace arrow {
-class ArrowReaderProperties;
class ColumnReaderImpl;
// ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 82c8566..49d0dd9 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -18,18 +18,13 @@
#include "parquet/arrow/schema.h"
#include <string>
-#include <unordered_set>
-#include <utility>
#include <vector>
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
-#include "arrow/util/logging.h"
-#include "parquet/arrow/writer.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
-#include "parquet/schema-internal.h"
#include "parquet/types.h"
using arrow::Field;
@@ -262,7 +257,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
static_cast<const ::arrow::Decimal128Type&>(*field->type());
precision = decimal_type.precision();
scale = decimal_type.scale();
- length = DecimalSize(precision);
+ length = internal::DecimalSize(precision);
PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
} break;
case ArrowTypeId::DATE32:
@@ -351,77 +346,5 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
out);
}
-/// \brief Compute the number of bytes required to represent a decimal of a
-/// given precision. Taken from the Apache Impala codebase. The comments next
-/// to the return values are the maximum value that can be represented in 2's
-/// complement with the returned number of bytes.
-int32_t DecimalSize(int32_t precision) {
- DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
- << precision;
- DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
- << precision;
-
- switch (precision) {
- case 1:
- case 2:
- return 1; // 127
- case 3:
- case 4:
- return 2; // 32,767
- case 5:
- case 6:
- return 3; // 8,388,607
- case 7:
- case 8:
- case 9:
- return 4; // 2,147,483,427
- case 10:
- case 11:
- return 5; // 549,755,813,887
- case 12:
- case 13:
- case 14:
- return 6; // 140,737,488,355,327
- case 15:
- case 16:
- return 7; // 36,028,797,018,963,967
- case 17:
- case 18:
- return 8; // 9,223,372,036,854,775,807
- case 19:
- case 20:
- case 21:
- return 9; // 2,361,183,241,434,822,606,847
- case 22:
- case 23:
- return 10; // 604,462,909,807,314,587,353,087
- case 24:
- case 25:
- case 26:
- return 11; // 154,742,504,910,672,534,362,390,527
- case 27:
- case 28:
- return 12; // 39,614,081,257,132,168,796,771,975,167
- case 29:
- case 30:
- case 31:
- return 13; // 10,141,204,801,825,835,211,973,625,643,007
- case 32:
- case 33:
- return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
- case 34:
- case 35:
- return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
- case 36:
- case 37:
- case 38:
- return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
- default:
- break;
- }
- DCHECK(false);
- return -1;
-}
-
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index b3cc66b..5ec9b0c 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -18,11 +18,8 @@
#ifndef PARQUET_ARROW_SCHEMA_H
#define PARQUET_ARROW_SCHEMA_H
-#include <cstdint>
#include <memory>
-#include <vector>
-#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
@@ -35,12 +32,11 @@ class Schema;
namespace parquet {
+class ArrowWriterProperties;
class WriterProperties;
namespace arrow {
-class ArrowWriterProperties;
-
PARQUET_EXPORT
::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
const WriterProperties& properties,
@@ -58,11 +54,7 @@ PARQUET_EXPORT
const WriterProperties& properties,
std::shared_ptr<SchemaDescriptor>* out);
-PARQUET_EXPORT
-int32_t DecimalSize(int32_t precision);
-
} // namespace arrow
-
} // namespace parquet
#endif // PARQUET_ARROW_SCHEMA_H
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index ee3b880..fb437f1 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -18,7 +18,6 @@
#include "parquet/arrow/writer.h"
#include <algorithm>
-#include <cstddef>
#include <deque>
#include <type_traits>
#include <utility>
@@ -28,12 +27,9 @@
#include "arrow/buffer-builder.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
-#include "arrow/util/checked_cast.h"
+#include "arrow/type.h"
#include "arrow/visitor_inline.h"
-#include "arrow/util/logging.h"
-
-#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/column_writer.h"
@@ -70,12 +66,6 @@ using parquet::schema::GroupNode;
namespace parquet {
namespace arrow {
-std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
- static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
- ArrowWriterProperties::Builder().build();
- return default_writer_properties;
-}
-
namespace {
class LevelBuilder {
@@ -293,31 +283,6 @@ Status LevelBuilder::VisitInline(const Array& array) {
return VisitArrayInline(array, this);
}
-struct ColumnWriterContext {
- ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
- : memory_pool(memory_pool), properties(properties) {
- this->data_buffer = AllocateBuffer(memory_pool);
- this->def_levels_buffer = AllocateBuffer(memory_pool);
- }
-
- template <typename T>
- Status GetScratchData(const int64_t num_values, T** out) {
- RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
- *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
- return Status::OK();
- }
-
- MemoryPool* memory_pool;
- ArrowWriterProperties* properties;
-
- // Buffer used for storing the data of an array converted to the physical type
- // as expected by parquet-cpp.
- std::shared_ptr<ResizableBuffer> data_buffer;
-
- // We use the shared ownership of this buffer
- std::shared_ptr<ResizableBuffer> def_levels_buffer;
-};
-
Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
if (type.num_children() != 1) {
@@ -332,7 +297,7 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type
class ArrowColumnWriter {
public:
- ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
+ ArrowColumnWriter(ArrowWriteContext* ctx, ColumnWriter* column_writer,
const SchemaField* schema_field,
const SchemaManifest* schema_manifest)
: ctx_(ctx),
@@ -340,7 +305,35 @@ class ArrowColumnWriter {
schema_field_(schema_field),
schema_manifest_(schema_manifest) {}
- Status Write(const Array& data);
+ Status Write(const Array& data) {
+ if (data.length() == 0) {
+ // Write nothing when length is 0
+ return Status::OK();
+ }
+
+ ::arrow::Type::type values_type;
+ RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
+
+ std::shared_ptr<Array> _values_array;
+ int64_t values_offset = 0;
+ int64_t num_levels = 0;
+ int64_t num_values = 0;
+ LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
+ std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
+ RETURN_NOT_OK(level_builder.GenerateLevels(
+ data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+ &def_levels_buffer, &rep_levels_buffer, &_values_array));
+ const int16_t* def_levels = nullptr;
+ if (def_levels_buffer) {
+ def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
+ }
+ const int16_t* rep_levels = nullptr;
+ if (rep_levels_buffer) {
+ rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
+ }
+ std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
+ return writer_->WriteArrow(def_levels, rep_levels, num_levels, *values_array, ctx_);
+ }
Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
if (data.length() == 0) {
@@ -394,646 +387,24 @@ class ArrowColumnWriter {
}
private:
- template <typename ParquetType, typename ArrowType>
- Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels);
-
- Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels);
-
- Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels,
- const ArrowWriterProperties& properties);
-
- template <typename ParquetType, typename ArrowType>
- Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
- int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels,
- const typename ArrowType::c_type* values);
-
- template <typename ParquetType, typename ArrowType>
- Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels,
- const uint8_t* valid_bits, int64_t valid_bits_offset,
- const typename ArrowType::c_type* values);
-
- template <typename ParquetType>
- Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels,
- const typename ParquetType::c_type* values) {
- auto typed_writer =
- ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
- // WriteBatch was called with type mismatching the writer_'s type. This
- // could be a schema conversion problem.
- DCHECK(typed_writer);
- PARQUET_CATCH_NOT_OK(
- typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
- return Status::OK();
- }
-
- template <typename ParquetType>
- Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- const typename ParquetType::c_type* values) {
- auto typed_writer =
- ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
- // WriteBatchSpaced was called with type mismatching the writer_'s type. This
- // could be a schema conversion problem.
- DCHECK(typed_writer);
- PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
- num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
- return Status::OK();
- }
-
- ColumnWriterContext* ctx_;
+ ArrowWriteContext* ctx_;
ColumnWriter* writer_;
const SchemaField* schema_field_;
const SchemaManifest* schema_manifest_;
};
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
- const int16_t* def_levels,
- const int16_t* rep_levels) {
- using ArrowCType = typename ArrowType::c_type;
-
- const auto& data = static_cast<const PrimitiveArray&>(array);
- const ArrowCType* values = nullptr;
- // The values buffer may be null if the array is empty (ARROW-2744)
- if (data.values() != nullptr) {
- values = reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
- } else {
- DCHECK_EQ(data.length(), 0);
- }
-
- if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
- // no nulls, just dump the data
- RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
- static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
- def_levels, rep_levels, values)));
- } else {
- const uint8_t* valid_bits = data.null_bitmap_data();
- RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
- static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
- def_levels, rep_levels, valid_bits, data.offset(), values)));
- }
- return Status::OK();
-}
-
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::WriteNonNullableBatch(
- const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels,
- const typename ArrowType::c_type* values) {
- using ParquetCType = typename ParquetType::c_type;
- ParquetCType* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
-
- std::copy(values, values + num_values, buffer);
-
- return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
- const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
- int32_t* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
-
- for (int i = 0; i < num_values; i++) {
- buffer[i] = static_cast<int32_t>(values[i] / 86400000);
- }
-
- return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
- const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
- int32_t* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
- if (type.unit() == TimeUnit::SECOND) {
- for (int i = 0; i < num_values; i++) {
- buffer[i] = values[i] * 1000;
- }
- } else {
- std::copy(values, values + num_values, buffer);
- }
- return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>( \
- const ArrowType& type, int64_t num_values, int64_t num_levels, \
- const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
- return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer); \
- }
-
-NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::WriteNullableBatch(
- const ArrowType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
- using ParquetCType = typename ParquetType::c_type;
-
- ParquetCType* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
- for (int i = 0; i < num_values; i++) {
- buffer[i] = static_cast<ParquetCType>(values[i]);
- }
-
- return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
- valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
- const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const int64_t* values) {
- int32_t* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
-
- for (int i = 0; i < num_values; i++) {
- // Convert from milliseconds into days since the epoch
- buffer[i] = static_cast<int32_t>(values[i] / 86400000);
- }
-
- return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
- valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
- const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const int32_t* values) {
- int32_t* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
-
- if (type.unit() == TimeUnit::SECOND) {
- for (int i = 0; i < num_values; i++) {
- buffer[i] = values[i] * 1000;
- }
- } else {
- for (int i = 0; i < num_values; i++) {
- buffer[i] = values[i];
- }
- }
- return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
- valid_bits_offset, buffer);
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
- template <> \
- Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>( \
- const ArrowType& type, int64_t num_values, int64_t num_levels, \
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
- int64_t valid_bits_offset, const CType* values) { \
- return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
- valid_bits_offset, values); \
- }
-
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
-NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
-
-#define CONV_CASE_LOOP(ConversionFunction) \
- for (int64_t i = 0; i < num_values; i++) \
- ConversionFunction(arrow_values[i], &output[i]);
-
-static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values,
- int64_t num_values,
- ::arrow::TimeUnit ::type unit_type,
- Int96* output) {
- switch (unit_type) {
- case TimeUnit::NANO:
- CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp);
- break;
- case TimeUnit::MICRO:
- CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp);
- break;
- case TimeUnit::MILLI:
- CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp);
- break;
- case TimeUnit::SECOND:
- CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp);
- break;
- }
-}
-
-#undef CONV_CASE_LOOP
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
- const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const int64_t* values) {
- Int96* buffer = nullptr;
- RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-
- ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
-
- return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
- valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
- const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
- Int96* buffer = nullptr;
- RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-
- ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
-
- return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
- const int16_t* def_levels,
- const int16_t* rep_levels) {
- const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
-
- if (ctx_->properties->support_deprecated_int96_timestamps()) {
- // User explicitly requested Int96 timestamps
- return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
- def_levels, rep_levels);
- } else if (ctx_->properties->coerce_timestamps_enabled()) {
- // User explicitly requested coercion to specific unit
- if (source_type.unit() == ctx_->properties->coerce_timestamps_unit()) {
- // No data conversion necessary
- return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
- def_levels, rep_levels);
- } else {
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels,
- *(ctx_->properties));
- }
- } else if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0 &&
- source_type.unit() == TimeUnit::NANO) {
- // Absent superseding user instructions, when writing Parquet version 1.0 files,
- // timestamps in nanoseconds are coerced to microseconds
- std::shared_ptr<ArrowWriterProperties> properties =
- (ArrowWriterProperties::Builder())
- .coerce_timestamps(TimeUnit::MICRO)
- ->disallow_truncated_timestamps()
- ->build();
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
- } else if (source_type.unit() == TimeUnit::SECOND) {
- // Absent superseding user instructions, timestamps in seconds are coerced to
- // milliseconds
- std::shared_ptr<ArrowWriterProperties> properties =
- (ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build();
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
- } else {
- // No data conversion necessary
- return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
- def_levels, rep_levels);
- }
-}
-
-#define COERCE_DIVIDE -1
-#define COERCE_INVALID 0
-#define COERCE_MULTIPLY +1
-
-static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
- // from seconds ...
- {{COERCE_INVALID, 0}, // ... to seconds
- {COERCE_MULTIPLY, 1000}, // ... to millis
- {COERCE_MULTIPLY, 1000000}, // ... to micros
- {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos
- // from millis ...
- {{COERCE_INVALID, 0},
- {COERCE_MULTIPLY, 1},
- {COERCE_MULTIPLY, 1000},
- {COERCE_MULTIPLY, 1000000}},
- // from micros ...
- {{COERCE_INVALID, 0},
- {COERCE_DIVIDE, 1000},
- {COERCE_MULTIPLY, 1},
- {COERCE_MULTIPLY, 1000}},
- // from nanos ...
- {{COERCE_INVALID, 0},
- {COERCE_DIVIDE, 1000000},
- {COERCE_DIVIDE, 1000},
- {COERCE_MULTIPLY, 1}}};
-
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
- const int16_t* def_levels,
- const int16_t* rep_levels,
- const ArrowWriterProperties& properties) {
- int64_t* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
-
- const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
- auto values = data.raw_values();
-
- const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
- auto source_unit = source_type.unit();
-
- TimeUnit::type target_unit = properties.coerce_timestamps_unit();
- auto target_type = ::arrow::timestamp(target_unit);
- bool truncation_allowed = properties.truncated_timestamps_allowed();
-
- auto DivideBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array.length(); i++) {
- if (!truncation_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
- return Status::Invalid("Casting from ", source_type.ToString(), " to ",
- target_type->ToString(), " would lose data: ", values[i]);
- }
- buffer[i] = values[i] / factor;
- }
- return Status::OK();
- };
-
- auto MultiplyBy = [&](const int64_t factor) {
- for (int64_t i = 0; i < array.length(); i++) {
- buffer[i] = values[i] * factor;
- }
- return Status::OK();
- };
-
- const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
- [static_cast<int>(target_unit)];
- // .first -> coercion operation; .second -> scale factor
- DCHECK_NE(coercion.first, COERCE_INVALID);
- RETURN_NOT_OK(coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
- : MultiplyBy(coercion.second));
-
- if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
- // no nulls, just dump the data
- RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
- static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
- num_levels, def_levels, rep_levels, buffer)));
- } else {
- const uint8_t* valid_bits = data.null_bitmap_data();
- RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
- static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
- num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
- }
-
- return Status::OK();
-}
-
-#undef COERCE_DIVIDE
-#undef COERCE_INVALID
-#undef COERCE_MULTIPLY
-
-// This specialization seems quite similar but it significantly differs in two points:
-// * offset is added at the most latest time to the pointer as we have sub-byte access
-// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
-// ArrowType::c_type to ParquetType::c_type
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
- const Array& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- bool* buffer = nullptr;
- RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
-
- const auto& data = static_cast<const BooleanArray&>(array);
- const uint8_t* values = nullptr;
- // The values buffer may be null if the array is empty (ARROW-2744)
- if (data.values() != nullptr) {
- values = reinterpret_cast<const uint8_t*>(data.values()->data());
- } else {
- DCHECK_EQ(data.length(), 0);
- }
-
- int buffer_idx = 0;
- int64_t offset = array.offset();
- for (int i = 0; i < data.length(); i++) {
- if (!data.IsNull(i)) {
- buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
- }
- }
-
- return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
- const Array& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
- const Array& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- ByteArray* buffer = nullptr;
- RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
-
- const auto& data = static_cast<const BinaryArray&>(array);
-
- // In the case of an array consisting of only empty strings or all null,
- // data.data() points already to a nullptr, thus data.data()->data() will
- // segfault.
- const uint8_t* values = nullptr;
- if (data.value_data()) {
- values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
- DCHECK(values != nullptr);
- }
-
- // Slice offset is accounted for in raw_value_offsets
- const int32_t* value_offset = data.raw_value_offsets();
-
- if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
- // no nulls, just dump the data
- for (int64_t i = 0; i < data.length(); i++) {
- buffer[i] =
- ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
- }
- } else {
- int buffer_idx = 0;
- for (int64_t i = 0; i < data.length(); i++) {
- if (!data.IsNull(i)) {
- buffer[buffer_idx++] =
- ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
- }
- }
- }
-
- return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
- const Array& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
- const int64_t length = data.length();
-
- FLBA* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
-
- if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
- // no nulls, just dump the data
- // todo(advancedxy): use a writeBatch to avoid this step
- for (int64_t i = 0; i < length; i++) {
- buffer[i] = FixedLenByteArray(data.GetValue(i));
- }
- } else {
- int buffer_idx = 0;
- for (int64_t i = 0; i < length; i++) {
- if (!data.IsNull(i)) {
- buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
- }
- }
- }
-
- return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
- const Array& array, int64_t num_levels, const int16_t* def_levels,
- const int16_t* rep_levels) {
- const auto& data = static_cast<const Decimal128Array&>(array);
- const int64_t length = data.length();
-
- FLBA* buffer;
- RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
-
- const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
- const int32_t offset =
- decimal_type.byte_width() - DecimalSize(decimal_type.precision());
-
- const bool does_not_have_nulls =
- writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
-
- const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
- std::vector<uint64_t> big_endian_values(valid_value_count);
-
- // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
- // don't have to keep writing two loops to handle the case where we know there are no
- // nulls
- if (does_not_have_nulls) {
- // no nulls, just dump the data
- // todo(advancedxy): use a writeBatch to avoid this step
- for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
- auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
- big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
- big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer[i] = FixedLenByteArray(
- reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
- }
- } else {
- for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
- if (!data.IsNull(i)) {
- auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
- big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
- big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
- buffer[buffer_idx++] = FixedLenByteArray(
- reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
- j += 2;
- }
- }
- }
-
- return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-Status ArrowColumnWriter::Write(const Array& data) {
- if (data.length() == 0) {
- // Write nothing when length is 0
- return Status::OK();
- }
-
- ::arrow::Type::type values_type;
- RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
-
- std::shared_ptr<Array> _values_array;
- int64_t values_offset = 0;
- int64_t num_levels = 0;
- int64_t num_values = 0;
- LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
- std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
- RETURN_NOT_OK(level_builder.GenerateLevels(
- data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
- &def_levels_buffer, &rep_levels_buffer, &_values_array));
- const int16_t* def_levels = nullptr;
- if (def_levels_buffer) {
- def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
- }
- const int16_t* rep_levels = nullptr;
- if (rep_levels_buffer) {
- rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
- }
- std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
-
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \
- case ::arrow::Type::ArrowEnum: \
- return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
- def_levels, rep_levels);
-
- switch (values_type) {
- case ::arrow::Type::UINT32: {
- if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
- // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
- // to use the larger Int64Type to store them lossless.
- return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
- def_levels, rep_levels);
- } else {
- return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
- def_levels, rep_levels);
- }
- }
- WRITE_BATCH_CASE(NA, NullType, Int32Type)
- case ::arrow::Type::TIMESTAMP:
- return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
- WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
- WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
- WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
- WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
- WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
- WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
- WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
- WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
- WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
- WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
- WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
- WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
- WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
- WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
- WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
- WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
- WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
- WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
- default:
- break;
- }
- return Status::NotImplemented("Data type not supported as list value: ",
- values_array->type()->ToString());
-}
-
} // namespace
// ----------------------------------------------------------------------
// FileWriter implementation
-class FileWriter::Impl {
+class FileWriterImpl : public FileWriter {
public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
- : writer_(std::move(writer)),
+ FileWriterImpl(const std::shared_ptr<::arrow::Schema>& schema, MemoryPool* pool,
+ std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ : schema_(schema),
+ writer_(std::move(writer)),
row_group_writer_(nullptr),
column_write_context_(pool, arrow_properties.get()),
arrow_properties_(arrow_properties),
@@ -1044,7 +415,7 @@ class FileWriter::Impl {
&schema_manifest_);
}
- Status NewRowGroup(int64_t chunk_size) {
+ Status NewRowGroup(int64_t chunk_size) override {
if (row_group_writer_ != nullptr) {
PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
@@ -1052,7 +423,7 @@ class FileWriter::Impl {
return Status::OK();
}
- Status Close() {
+ Status Close() override {
if (!closed_) {
// Make idempotent
closed_ = true;
@@ -1064,7 +435,7 @@ class FileWriter::Impl {
return Status::OK();
}
- Status WriteColumnChunk(const Array& data) {
+ Status WriteColumnChunk(const Array& data) override {
// A bit awkward here since cannot instantiate ChunkedArray from const Array&
::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
@@ -1072,7 +443,7 @@ class FileWriter::Impl {
}
Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
- const int64_t size) {
+ int64_t size) override {
// DictionaryArrays are not yet handled with a fast path. To still support
// writing them as a workaround, we convert them back to their non-dictionary
// representation.
@@ -1107,51 +478,70 @@ class FileWriter::Impl {
return arrow_writer.Close();
}
- const WriterProperties& properties() const { return *writer_->properties(); }
+ Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) override {
+ return WriteColumnChunk(data, 0, data->length());
+ }
+
+ Status WriteTable(const Table& table, int64_t chunk_size) override {
+ RETURN_NOT_OK(table.Validate());
+
+ if (chunk_size <= 0 && table.num_rows() > 0) {
+ return Status::Invalid("chunk size per row_group must be greater than 0");
+ } else if (!table.schema()->Equals(*schema_, false)) {
+ return Status::Invalid("table schema does not match this writer's. table:'",
+ table.schema()->ToString(), "' this:'", schema_->ToString(),
+ "'");
+ } else if (chunk_size > this->properties().max_row_group_length()) {
+ chunk_size = this->properties().max_row_group_length();
+ }
+
+ auto WriteRowGroup = [&](int64_t offset, int64_t size) {
+ RETURN_NOT_OK(NewRowGroup(size));
+ for (int i = 0; i < table.num_columns(); i++) {
+ RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
+ }
+ return Status::OK();
+ };
+
+ if (table.num_rows() == 0) {
+ // Append a row group with 0 rows
+ RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
+ return Status::OK();
+ }
+
+ for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
+ int64_t offset = chunk * chunk_size;
+ RETURN_NOT_OK_ELSE(
+ WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
+ PARQUET_IGNORE_NOT_OK(Close()));
+ }
+ return Status::OK();
+ }
- ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }
+ const WriterProperties& properties() const { return *writer_->properties(); }
- virtual ~Impl() {}
+ ::arrow::MemoryPool* memory_pool() const override {
+ return column_write_context_.memory_pool;
+ }
- const std::shared_ptr<FileMetaData> metadata() const { return writer_->metadata(); }
+ const std::shared_ptr<FileMetaData> metadata() const override {
+ return writer_->metadata();
+ }
private:
friend class FileWriter;
+ std::shared_ptr<::arrow::Schema> schema_;
+
SchemaManifest schema_manifest_;
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
- ColumnWriterContext column_write_context_;
+ ArrowWriteContext column_write_context_;
std::shared_ptr<ArrowWriterProperties> arrow_properties_;
bool closed_;
};
-Status FileWriter::NewRowGroup(int64_t chunk_size) {
- return impl_->NewRowGroup(chunk_size);
-}
-
-Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
- return impl_->WriteColumnChunk(data);
-}
-
-Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
- const int64_t offset, const int64_t size) {
- return impl_->WriteColumnChunk(data, offset, size);
-}
-
-Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
- return WriteColumnChunk(data, 0, data->length());
-}
-
-Status FileWriter::Close() { return impl_->Close(); }
-
-MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
-
-const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
- return impl_->metadata();
-}
-
FileWriter::~FileWriter() {}
Status FileWriter::Make(::arrow::MemoryPool* pool,
@@ -1159,16 +549,13 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::Schema>& schema,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* out) {
- out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties));
- return (*out)->impl_->Init();
+ std::unique_ptr<FileWriterImpl> impl(
+ new FileWriterImpl(schema, pool, std::move(writer), arrow_properties));
+ RETURN_NOT_OK(impl->Init());
+ *out = std::move(impl);
+ return Status::OK();
}
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<::arrow::Schema>& schema,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
- : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)),
- schema_(schema) {}
-
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
@@ -1206,42 +593,6 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
return Status::OK();
}
-Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
- RETURN_NOT_OK(table.Validate());
-
- if (chunk_size <= 0 && table.num_rows() > 0) {
- return Status::Invalid("chunk size per row_group must be greater than 0");
- } else if (!table.schema()->Equals(*schema_, false)) {
- return Status::Invalid("table schema does not match this writer's. table:'",
- table.schema()->ToString(), "' this:'", schema_->ToString(),
- "'");
- } else if (chunk_size > impl_->properties().max_row_group_length()) {
- chunk_size = impl_->properties().max_row_group_length();
- }
-
- auto WriteRowGroup = [&](int64_t offset, int64_t size) {
- RETURN_NOT_OK(NewRowGroup(size));
- for (int i = 0; i < table.num_columns(); i++) {
- RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
- }
- return Status::OK();
- };
-
- if (table.num_rows() == 0) {
- // Append a row group with 0 rows
- RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
- return Status::OK();
- }
-
- for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
- int64_t offset = chunk * chunk_size;
- RETURN_NOT_OK_ELSE(
- WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
- PARQUET_IGNORE_NOT_OK(Close()));
- }
- return Status::OK();
-}
-
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 8f7d499..b6c0c58 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -19,19 +19,16 @@
#define PARQUET_ARROW_WRITER_H
#include <cstdint>
-#include <cstring>
#include <memory>
#include "parquet/platform.h"
#include "parquet/properties.h"
-#include "parquet/types.h"
-
-#include "arrow/type.h"
namespace arrow {
class Array;
class ChunkedArray;
+class Schema;
class Table;
} // namespace arrow
@@ -43,84 +40,6 @@ class ParquetFileWriter;
namespace arrow {
-class PARQUET_EXPORT ArrowWriterProperties {
- public:
- class Builder {
- public:
- Builder()
- : write_timestamps_as_int96_(false),
- coerce_timestamps_enabled_(false),
- coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
- truncated_timestamps_allowed_(false) {}
- virtual ~Builder() {}
-
- Builder* disable_deprecated_int96_timestamps() {
- write_timestamps_as_int96_ = false;
- return this;
- }
-
- Builder* enable_deprecated_int96_timestamps() {
- write_timestamps_as_int96_ = true;
- return this;
- }
-
- Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
- coerce_timestamps_enabled_ = true;
- coerce_timestamps_unit_ = unit;
- return this;
- }
-
- Builder* allow_truncated_timestamps() {
- truncated_timestamps_allowed_ = true;
- return this;
- }
-
- Builder* disallow_truncated_timestamps() {
- truncated_timestamps_allowed_ = false;
- return this;
- }
-
- std::shared_ptr<ArrowWriterProperties> build() {
- return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
- write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
- truncated_timestamps_allowed_));
- }
-
- private:
- bool write_timestamps_as_int96_;
-
- bool coerce_timestamps_enabled_;
- ::arrow::TimeUnit::type coerce_timestamps_unit_;
- bool truncated_timestamps_allowed_;
- };
-
- bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
-
- bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
- ::arrow::TimeUnit::type coerce_timestamps_unit() const {
- return coerce_timestamps_unit_;
- }
-
- bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
-
- private:
- explicit ArrowWriterProperties(bool write_nanos_as_int96,
- bool coerce_timestamps_enabled,
- ::arrow::TimeUnit::type coerce_timestamps_unit,
- bool truncated_timestamps_allowed)
- : write_timestamps_as_int96_(write_nanos_as_int96),
- coerce_timestamps_enabled_(coerce_timestamps_enabled),
- coerce_timestamps_unit_(coerce_timestamps_unit),
- truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
-
- const bool write_timestamps_as_int96_;
- const bool coerce_timestamps_enabled_;
- const ::arrow::TimeUnit::type coerce_timestamps_unit_;
- const bool truncated_timestamps_allowed_;
-};
-
-std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
-
/**
* Iterative API:
* Start a new RowGroup/Chunk with NewRowGroup
@@ -129,49 +48,41 @@ std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_prope
class PARQUET_EXPORT FileWriter {
public:
static ::arrow::Status Make(
- ::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
const std::shared_ptr<::arrow::Schema>& schema,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* out);
- static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
std::unique_ptr<FileWriter>* writer);
static ::arrow::Status Open(
- const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const ::arrow::Schema& schema, MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);
/// \brief Write a Table to Parquet.
- ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
+ virtual ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size) = 0;
- ::arrow::Status NewRowGroup(int64_t chunk_size);
- ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
+ virtual ::arrow::Status NewRowGroup(int64_t chunk_size) = 0;
+ virtual ::arrow::Status WriteColumnChunk(const ::arrow::Array& data) = 0;
/// \brief Write ColumnChunk in row group using slice of a ChunkedArray
- ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
- const int64_t offset, const int64_t size);
- ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data);
- ::arrow::Status Close();
+ virtual ::arrow::Status WriteColumnChunk(
+ const std::shared_ptr<::arrow::ChunkedArray>& data, int64_t offset,
+ int64_t size) = 0;
+ virtual ::arrow::Status WriteColumnChunk(
+ const std::shared_ptr<::arrow::ChunkedArray>& data) = 0;
+ virtual ::arrow::Status Close() = 0;
virtual ~FileWriter();
- ::arrow::MemoryPool* memory_pool() const;
-
- const std::shared_ptr<FileMetaData> metadata() const;
-
- private:
- FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
- const std::shared_ptr<::arrow::Schema>& schema,
- const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
-
- class PARQUET_NO_EXPORT Impl;
- std::unique_ptr<Impl> impl_;
- std::shared_ptr<::arrow::Schema> schema_;
+ virtual MemoryPool* memory_pool() const = 0;
+ virtual const std::shared_ptr<FileMetaData> metadata() const = 0;
};
/// \brief Write Parquet file metadata only to indicated Arrow OutputStream
@@ -190,66 +101,13 @@ PARQUET_EXPORT
* The table shall only consist of columns of primitive type or of primitive lists.
*/
::arrow::Status PARQUET_EXPORT WriteTable(
- const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+ const ::arrow::Table& table, MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
default_arrow_writer_properties());
-namespace internal {
-
-/**
- * Timestamp conversion constants
- */
-constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
-
-template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
-inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
- int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
- (*impala_timestamp).value[2] = (uint32_t)julian_days;
-
- int64_t last_day_units = time % UnitPerDay;
- auto last_day_nanos = last_day_units * NanosecondsPerUnit;
- // impala_timestamp will be unaligned every other entry so do memcpy instead
- // of assign and reinterpret cast to avoid undefined behavior.
- std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
-}
-
-constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
-
-inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
- ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
- impala_timestamp);
-}
-
-constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
-
-inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
- Int96* impala_timestamp) {
- ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
- milliseconds, impala_timestamp);
-}
-
-constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
-
-inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
- Int96* impala_timestamp) {
- ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
- microseconds, impala_timestamp);
-}
-
-constexpr int64_t kNanosecondsInNanos = INT64_C(1);
-
-inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
- Int96* impala_timestamp) {
- ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
- nanoseconds, impala_timestamp);
-}
-
-} // namespace internal
-
} // namespace arrow
-
} // namespace parquet
#endif // PARQUET_ARROW_WRITER_H
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 86e21c2..fa16234 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -17,14 +17,17 @@
#include "parquet/column_writer.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
#include <memory>
#include <utility>
#include <vector>
+#include "arrow/array.h"
#include "arrow/buffer-builder.h"
-#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
#include "arrow/util/bit-stream-utils.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
@@ -43,6 +46,7 @@
namespace parquet {
+using ::arrow::Status;
using ::arrow::internal::checked_cast;
using BitWriter = ::arrow::BitUtil::BitWriter;
@@ -131,7 +135,7 @@ class SerializedPageWriter : public PageWriter {
public:
SerializedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ MemoryPool* pool = ::arrow::default_memory_pool())
: sink_(sink),
metadata_(metadata),
pool_(pool),
@@ -268,7 +272,7 @@ class SerializedPageWriter : public PageWriter {
private:
std::shared_ptr<ArrowOutputStream> sink_;
ColumnChunkMetaDataBuilder* metadata_;
- ::arrow::MemoryPool* pool_;
+ MemoryPool* pool_;
int64_t num_values_;
int64_t dictionary_page_offset_;
int64_t data_page_offset_;
@@ -286,7 +290,7 @@ class BufferedPageWriter : public PageWriter {
public:
BufferedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ MemoryPool* pool = ::arrow::default_memory_pool())
: final_sink_(sink), metadata_(metadata) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::unique_ptr<SerializedPageWriter>(
@@ -334,8 +338,7 @@ class BufferedPageWriter : public PageWriter {
std::unique_ptr<PageWriter> PageWriter::Open(
const std::shared_ptr<ArrowOutputStream>& sink, Compression::type codec,
- ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool,
- bool buffered_row_group) {
+ ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, bool buffered_row_group) {
if (buffered_row_group) {
return std::unique_ptr<PageWriter>(
new BufferedPageWriter(sink, codec, metadata, pool));
@@ -411,7 +414,9 @@ class ColumnWriterImpl {
void AddDataPage();
// Serializes Data Pages
- void WriteDataPage(const CompressedDataPage& page);
+ void WriteDataPage(const CompressedDataPage& page) {
+ total_bytes_written_ += pager_->WriteDataPage(page);
+ }
// Write multiple definition levels
void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -445,7 +450,7 @@ class ColumnWriterImpl {
LevelEncoder level_encoder_;
- ::arrow::MemoryPool* allocator_;
+ MemoryPool* allocator_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
@@ -586,10 +591,6 @@ void ColumnWriterImpl::AddDataPage() {
num_buffered_encoded_values_ = 0;
}
-void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) {
- total_bytes_written_ += pager_->WriteDataPage(page);
-}
-
int64_t ColumnWriterImpl::Close() {
if (!closed_) {
closed_ = true;
@@ -658,6 +659,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values) override;
+ Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
+ int64_t num_levels, const ::arrow::Array& array,
+ ArrowWriteContext* context) override;
+
int64_t EstimatedBufferedValueBytes() const override {
return current_encoder_->EstimatedDataEncodedSize();
}
@@ -666,7 +671,20 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
std::shared_ptr<Buffer> GetValuesBuffer() override {
return current_encoder_->FlushValues();
}
- void WriteDictionaryPage() override;
+
+ void WriteDictionaryPage() override {
+ // We have to dynamic cast here because of TypedEncoder<Type> as
+ // some compilers don't want to cast through virtual inheritance
+ auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
+ DCHECK(dict_encoder);
+ std::shared_ptr<ResizableBuffer> buffer =
+ AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
+ dict_encoder->WriteDict(buffer->mutable_data());
+
+ DictionaryPage page(buffer, dict_encoder->num_entries(),
+ properties_->dictionary_page_encoding());
+ total_bytes_written_ += pager_->WriteDictionaryPage(page);
+ }
// Checks if the Dictionary Page size limit is reached
// If the limit is reached, the Dictionary and Data Pages are serialized
@@ -685,7 +703,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
return result;
}
- void ResetPageStatistics() override;
+ void ResetPageStatistics() override {
+ if (chunk_statistics_ != nullptr) {
+ chunk_statistics_->Merge(*page_statistics_);
+ page_statistics_->Reset();
+ }
+ }
Type::type type() const override { return descr_->physical_type(); }
@@ -700,6 +723,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
const WriterProperties* properties() override { return properties_; }
private:
+ using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
+ using TypedStats = TypedStatistics<DType>;
+ std::unique_ptr<Encoder> current_encoder_;
+ std::shared_ptr<TypedStats> page_statistics_;
+ std::shared_ptr<TypedStats> chunk_statistics_;
+
inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
const int16_t* rep_levels, const T* values);
@@ -710,16 +739,16 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
int64_t* num_spaced_written);
// Write values to a temporary buffer before they are encoded into pages
- void WriteValues(int64_t num_values, const T* values);
- void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values);
-
- using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
- std::unique_ptr<Encoder> current_encoder_;
+ void WriteValues(int64_t num_values, const T* values) {
+ dynamic_cast<ValueEncoderType*>(current_encoder_.get())
+ ->Put(values, static_cast<int>(num_values));
+ }
- using TypedStats = TypedStatistics<DType>;
- std::shared_ptr<TypedStats> page_statistics_;
- std::shared_ptr<TypedStats> chunk_statistics_;
+ void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values) {
+ dynamic_cast<ValueEncoderType*>(current_encoder_.get())
+ ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+ }
};
// Only one Dictionary Page is written.
@@ -741,29 +770,6 @@ void TypedColumnWriterImpl<DType>::CheckDictionarySizeLimit() {
}
}
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteDictionaryPage() {
- // We have to dynamic cast here because TypedEncoder<Type> as some compilers
- // don't want to cast through virtual inheritance
- auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
- DCHECK(dict_encoder);
- std::shared_ptr<ResizableBuffer> buffer =
- AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
- dict_encoder->WriteDict(buffer->mutable_data());
-
- DictionaryPage page(buffer, dict_encoder->num_entries(),
- properties_->dictionary_page_encoding());
- total_bytes_written_ += pager_->WriteDictionaryPage(page);
-}
-
-template <typename DType>
-void TypedColumnWriterImpl<DType>::ResetPageStatistics() {
- if (chunk_statistics_ != nullptr) {
- chunk_statistics_->Merge(*page_statistics_);
- page_statistics_->Reset();
- }
-}
-
// ----------------------------------------------------------------------
// Instantiate templated classes
@@ -952,19 +958,534 @@ void TypedColumnWriterImpl<DType>::WriteBatchSpaced(
values + values_offset, &num_spaced_written);
}
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteValues(int64_t num_values, const T* values) {
- dynamic_cast<ValueEncoderType*>(current_encoder_.get())
- ->Put(values, static_cast<int>(num_values));
+// ----------------------------------------------------------------------
+// Direct Arrow write path
+
+template <typename ParquetType, typename ArrowType, typename Enable = void>
+struct SerializeFunctor {
+ using ArrowCType = typename ArrowType::c_type;
+ using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+ using ParquetCType = typename ParquetType::c_type;
+ Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
+ const ArrowCType* input = array.raw_values();
+ if (array.null_count() > 0) {
+ for (int i = 0; i < array.length(); i++) {
+ out[i] = static_cast<ParquetCType>(input[i]);
+ }
+ } else {
+ std::copy(input, input + array.length(), out);
+ }
+ return Status::OK();
+ }
+};
+
+template <typename ParquetType, typename ArrowType>
+inline Status SerializeData(const ::arrow::Array& array, ArrowWriteContext* ctx,
+ typename ParquetType::c_type* out) {
+ using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+ SerializeFunctor<ParquetType, ArrowType> functor;
+ return functor.Serialize(checked_cast<const ArrayType&>(array), ctx, out);
}
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteValuesSpaced(int64_t num_values,
- const uint8_t* valid_bits,
- int64_t valid_bits_offset,
- const T* values) {
- dynamic_cast<ValueEncoderType*>(current_encoder_.get())
- ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+template <typename ParquetType, typename ArrowType>
+Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ ArrowWriteContext* ctx,
+ TypedColumnWriter<ParquetType>* writer) {
+ using ParquetCType = typename ParquetType::c_type;
+
+ ParquetCType* buffer;
+ PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
+
+ bool no_nulls =
+ writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
+
+ Status s = SerializeData<ParquetType, ArrowType>(array, ctx, buffer);
+ RETURN_NOT_OK(s);
+ if (no_nulls) {
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
+ } else {
+ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
+ array.null_bitmap_data(),
+ array.offset(), buffer));
+ }
+ return Status::OK();
+}
+
+template <typename ParquetType>
+Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ ArrowWriteContext* ctx,
+ TypedColumnWriter<ParquetType>* writer) {
+ using T = typename ParquetType::c_type;
+ const auto& data = static_cast<const ::arrow::PrimitiveArray&>(array);
+ const T* values = nullptr;
+ // The values buffer may be null if the array is empty (ARROW-2744)
+ if (data.values() != nullptr) {
+ values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
+ } else {
+ DCHECK_EQ(data.length(), 0);
+ }
+ if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
+ } else {
+ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
+ data.null_bitmap_data(), data.offset(),
+ values));
+ }
+ return Status::OK();
+}
+
+#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType) \
+ case ::arrow::Type::ArrowEnum: \
+ return WriteArrowSerialize<ParquetType, ::arrow::ArrowType>( \
+ array, num_levels, def_levels, rep_levels, ctx, this);
+
+#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType) \
+ case ::arrow::Type::ArrowEnum: \
+ return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
+ ctx, this);
+
+#define ARROW_UNSUPPORTED() \
+ std::stringstream ss; \
+ ss << "Arrow type " << array.type()->ToString() \
+ << " cannot be written to Parquet type " << descr_->ToString(); \
+ return Status::Invalid(ss.str());
+
+// ----------------------------------------------------------------------
+// Write Arrow to BooleanType
+
+template <>
+Status TypedColumnWriterImpl<BooleanType>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ if (array.type_id() != ::arrow::Type::BOOL) {
+ ARROW_UNSUPPORTED();
+ }
+ bool* buffer = nullptr;
+ RETURN_NOT_OK(ctx->GetScratchData<bool>(array.length(), &buffer));
+
+ const auto& data = static_cast<const ::arrow::BooleanArray&>(array);
+ const uint8_t* values = nullptr;
+ // The values buffer may be null if the array is empty (ARROW-2744)
+ if (data.values() != nullptr) {
+ values = reinterpret_cast<const uint8_t*>(data.values()->data());
+ } else {
+ DCHECK_EQ(data.length(), 0);
+ }
+
+ int buffer_idx = 0;
+ int64_t offset = array.offset();
+ for (int i = 0; i < data.length(); i++) {
+ if (data.IsValid(i)) {
+ buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
+ }
+ }
+ PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, buffer));
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow types to INT32
+
+template <>
+struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
+ Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
+ const int64_t* input = array.raw_values();
+ for (int i = 0; i < array.length(); i++) {
+ *out++ = static_cast<int32_t>(*input++ / 86400000);
+ }
+ return Status::OK();
+ }
+};
+
+template <>
+struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
+ Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
+ const int32_t* input = array.raw_values();
+ const auto& type = static_cast<const ::arrow::Time32Type&>(*array.type());
+ if (type.unit() == ::arrow::TimeUnit::SECOND) {
+ for (int i = 0; i < array.length(); i++) {
+ out[i] = input[i] * 1000;
+ }
+ } else {
+ std::copy(input, input + array.length(), out);
+ }
+ return Status::OK();
+ }
+};
+
+template <>
+Status TypedColumnWriterImpl<Int32Type>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ switch (array.type()->id()) {
+ case ::arrow::Type::NA: {
+ PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+ } break;
+ WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
+ WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
+ WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
+ WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
+ WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
+ WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
+ WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
+ WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
+ WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
+ default:
+ ARROW_UNSUPPORTED()
+ }
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to Int64 and Int96
+
+#define INT96_CONVERT_LOOP(ConversionFunction) \
+ for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
+
+template <>
+struct SerializeFunctor<Int96Type, ::arrow::TimestampType> {
+ Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
+ const int64_t* input = array.raw_values();
+ const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
+ switch (type.unit()) {
+ case ::arrow::TimeUnit::NANO:
+ INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
+ break;
+ case ::arrow::TimeUnit::MICRO:
+ INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
+ break;
+ case ::arrow::TimeUnit::MILLI:
+ INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
+ break;
+ case ::arrow::TimeUnit::SECOND:
+ INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
+ break;
+ }
+ return Status::OK();
+ }
+};
+
+#define COERCE_DIVIDE -1
+#define COERCE_INVALID 0
+#define COERCE_MULTIPLY +1
+
+static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
+ // from seconds ...
+ {{COERCE_INVALID, 0}, // ... to seconds
+ {COERCE_MULTIPLY, 1000}, // ... to millis
+ {COERCE_MULTIPLY, 1000000}, // ... to micros
+ {COERCE_MULTIPLY, INT64_C(1000000000)}}, // ... to nanos
+ // from millis ...
+ {{COERCE_INVALID, 0},
+ {COERCE_MULTIPLY, 1},
+ {COERCE_MULTIPLY, 1000},
+ {COERCE_MULTIPLY, 1000000}},
+ // from micros ...
+ {{COERCE_INVALID, 0},
+ {COERCE_DIVIDE, 1000},
+ {COERCE_MULTIPLY, 1},
+ {COERCE_MULTIPLY, 1000}},
+ // from nanos ...
+ {{COERCE_INVALID, 0},
+ {COERCE_DIVIDE, 1000000},
+ {COERCE_DIVIDE, 1000},
+ {COERCE_MULTIPLY, 1}}};
+
+template <>
+struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
+ Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx,
+ int64_t* out) {
+ const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
+ auto source_unit = source_type.unit();
+ const int64_t* values = array.raw_values();
+
+ ::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
+ auto target_type = ::arrow::timestamp(target_unit);
+ bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
+
+ auto DivideBy = [&](const int64_t factor) {
+ for (int64_t i = 0; i < array.length(); i++) {
+ if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
+ return Status::Invalid("Casting from ", source_type.ToString(), " to ",
+ target_type->ToString(),
+ " would lose data: ", values[i]);
+ }
+ out[i] = values[i] / factor;
+ }
+ return Status::OK();
+ };
+
+ auto MultiplyBy = [&](const int64_t factor) {
+ for (int64_t i = 0; i < array.length(); i++) {
+ out[i] = values[i] * factor;
+ }
+ return Status::OK();
+ };
+
+ const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
+ [static_cast<int>(target_unit)];
+
+ // .first -> coercion operation; .second -> scale factor
+ DCHECK_NE(coercion.first, COERCE_INVALID);
+ return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
+ : MultiplyBy(coercion.second);
+ }
+};
+
+#undef COERCE_DIVIDE
+#undef COERCE_INVALID
+#undef COERCE_MULTIPLY
+
+Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels,
+ ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) {
+ const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
+
+ auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
+ ArrowWriteContext temp_ctx = *ctx;
+ temp_ctx.properties = properties;
+ return WriteArrowSerialize<Int64Type, ::arrow::TimestampType>(
+ values, num_levels, def_levels, rep_levels, &temp_ctx, writer);
+ };
+
+ if (ctx->properties->coerce_timestamps_enabled()) {
+ // User explicitly requested coercion to specific unit
+ if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
+ // No data conversion necessary
+ return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
+ ctx, writer);
+ } else {
+ return WriteCoerce(ctx->properties);
+ }
+ } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
+ source_type.unit() == ::arrow::TimeUnit::NANO) {
+ // Absent superseding user instructions, when writing Parquet version 1.0 files,
+ // timestamps in nanoseconds are coerced to microseconds
+ std::shared_ptr<ArrowWriterProperties> properties =
+ (ArrowWriterProperties::Builder())
+ .coerce_timestamps(::arrow::TimeUnit::MICRO)
+ ->disallow_truncated_timestamps()
+ ->build();
+ return WriteCoerce(properties.get());
+ } else if (source_type.unit() == ::arrow::TimeUnit::SECOND) {
+ // Absent superseding user instructions, timestamps in seconds are coerced to
+ // milliseconds
+ std::shared_ptr<ArrowWriterProperties> properties =
+ (ArrowWriterProperties::Builder())
+ .coerce_timestamps(::arrow::TimeUnit::MILLI)
+ ->build();
+ return WriteCoerce(properties.get());
+ } else {
+ // No data conversion necessary
+ return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
+ writer);
+ }
+}
+
+template <>
+Status TypedColumnWriterImpl<Int64Type>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ switch (array.type()->id()) {
+ case ::arrow::Type::TIMESTAMP:
+ return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this);
+ WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
+ WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
+ WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
+ WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
+ default:
+ ARROW_UNSUPPORTED();
+ }
+}
+
+template <>
+Status TypedColumnWriterImpl<Int96Type>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ if (array.type_id() != ::arrow::Type::TIMESTAMP) {
+ ARROW_UNSUPPORTED();
+ }
+ return WriteArrowSerialize<Int96Type, ::arrow::TimestampType>(
+ array, num_levels, def_levels, rep_levels, ctx, this);
+}
+
+// ----------------------------------------------------------------------
+// Floating point types
+
+template <>
+Status TypedColumnWriterImpl<FloatType>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ if (array.type_id() != ::arrow::Type::FLOAT) {
+ ARROW_UNSUPPORTED();
+ }
+ return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
+ this);
+}
+
+template <>
+Status TypedColumnWriterImpl<DoubleType>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ if (array.type_id() != ::arrow::Type::DOUBLE) {
+ ARROW_UNSUPPORTED();
+ }
+ return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
+ this);
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to BYTE_ARRAY
+
+template <typename ParquetType, typename ArrowType>
+struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_binary<ArrowType>> {
+ Status Serialize(const ::arrow::BinaryArray& array, ArrowWriteContext*,
+ ByteArray* out) {
+ // In the case of an array consisting of only empty strings or all null,
+ // array.data() points already to a nullptr, thus array.data()->data() will
+ // segfault.
+ const uint8_t* values = nullptr;
+ if (array.value_data()) {
+ values = reinterpret_cast<const uint8_t*>(array.value_data()->data());
+ DCHECK(values != nullptr);
+ }
+
+ // Slice offset is accounted for in raw_value_offsets
+ const int32_t* value_offset = array.raw_value_offsets();
+ if (array.null_count() == 0) {
+ // no nulls, just dump the data
+ for (int64_t i = 0; i < array.length(); i++) {
+ out[i] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
+ }
+ } else {
+ for (int64_t i = 0; i < array.length(); i++) {
+ if (array.IsValid(i)) {
+ out[i] =
+ ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
+ }
+ }
+ }
+ return Status::OK();
+ }
+};
+
+template <>
+Status TypedColumnWriterImpl<ByteArrayType>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ switch (array.type()->id()) {
+ WRITE_SERIALIZE_CASE(BINARY, BinaryType, ByteArrayType)
+ WRITE_SERIALIZE_CASE(STRING, BinaryType, ByteArrayType)
+ default:
+ ARROW_UNSUPPORTED();
+ }
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to FIXED_LEN_BYTE_ARRAY
+
+template <typename ParquetType, typename ArrowType>
+struct SerializeFunctor<ParquetType, ArrowType,
+ ::arrow::enable_if_fixed_size_binary<ArrowType>> {
+ Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
+ FLBA* out) {
+ if (array.null_count() == 0) {
+ // no nulls, just dump the data
+ // todo(advancedxy): use a writeBatch to avoid this step
+ for (int64_t i = 0; i < array.length(); i++) {
+ out[i] = FixedLenByteArray(array.GetValue(i));
+ }
+ } else {
+ for (int64_t i = 0; i < array.length(); i++) {
+ if (array.IsValid(i)) {
+ out[i] = FixedLenByteArray(array.GetValue(i));
+ }
+ }
+ }
+ return Status::OK();
+ }
+};
+
+template <>
+Status WriteArrowSerialize<FLBAType, ::arrow::Decimal128Type>(
+ const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, ArrowWriteContext* ctx,
+ TypedColumnWriter<FLBAType>* writer) {
+ const auto& data = static_cast<const ::arrow::Decimal128Array&>(array);
+ const int64_t length = data.length();
+
+ FLBA* buffer;
+ RETURN_NOT_OK(ctx->GetScratchData<FLBA>(num_levels, &buffer));
+
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
+ const int32_t offset =
+ decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision());
+
+ const bool does_not_have_nulls =
+ writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+ const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
+ std::vector<uint64_t> big_endian_values(valid_value_count);
+
+ // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
+ // don't have to keep writing two loops to handle the case where we know there are no
+ // nulls
+ if (does_not_have_nulls) {
+ // no nulls, just dump the data
+ // todo(advancedxy): use a writeBatch to avoid this step
+ for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
+ auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+ big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+ big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+ buffer[i] = FixedLenByteArray(
+ reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+ }
+ } else {
+ for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
+ if (data.IsValid(i)) {
+ auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+ big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+ big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+ buffer[buffer_idx++] = FixedLenByteArray(
+ reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+ j += 2;
+ }
+ }
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
+ return Status::OK();
+}
+
+template <>
+Status TypedColumnWriterImpl<FLBAType>::WriteArrow(const int16_t* def_levels,
+ const int16_t* rep_levels,
+ int64_t num_levels,
+ const ::arrow::Array& array,
+ ArrowWriteContext* ctx) {
+ switch (array.type()->id()) {
+ WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
+ WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType)
+ default:
+ break;
+ }
+ return Status::OK();
}
// ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 0a67a86..27ca400 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -18,6 +18,7 @@
#pragma once
#include <cstdint>
+#include <cstring>
#include <memory>
#include "parquet/exception.h"
@@ -26,6 +27,8 @@
namespace arrow {
+class Array;
+
namespace BitUtil {
class BitWriter;
} // namespace BitUtil
@@ -38,6 +41,7 @@ class RleEncoder;
namespace parquet {
+struct ArrowWriteContext;
class ColumnDescriptor;
class CompressedDataPage;
class DictionaryPage;
@@ -130,6 +134,13 @@ class PARQUET_EXPORT ColumnWriter {
/// \brief The file-level writer properties
virtual const WriterProperties* properties() = 0;
+
+ /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
+ /// error status if the array data type is not compatible with the concrete
+ /// writer type
+ virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
+ int64_t num_levels, const ::arrow::Array& array,
+ ArrowWriteContext* ctx) = 0;
};
// API to write values to a single column. This is the main client facing API.
@@ -186,4 +197,55 @@ using DoubleWriter = TypedColumnWriter<DoubleType>;
using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
+namespace internal {
+
+/**
+ * Timestamp conversion constants
+ */
+constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
+
+template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
+inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
+ int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
+ (*impala_timestamp).value[2] = (uint32_t)julian_days;
+
+ int64_t last_day_units = time % UnitPerDay;
+ auto last_day_nanos = last_day_units * NanosecondsPerUnit;
+ // impala_timestamp will be unaligned every other entry so do memcpy instead
+ // of assign and reinterpret cast to avoid undefined behavior.
+ std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
+}
+
+constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
+
+inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
+ impala_timestamp);
+}
+
+constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
+
+inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
+ milliseconds, impala_timestamp);
+}
+
+constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
+
+inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
+ microseconds, impala_timestamp);
+}
+
+constexpr int64_t kNanosecondsInNanos = INT64_C(1);
+
+inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
+ nanoseconds, impala_timestamp);
+}
+
+} // namespace internal
} // namespace parquet
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index 4d3c5d6..e8ffad4 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -38,4 +38,15 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
}
}
+ArrowReaderProperties default_arrow_reader_properties() {
+ static ArrowReaderProperties default_reader_props;
+ return default_reader_props;
+}
+
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
+ static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
+ ArrowWriterProperties::Builder().build();
+ return default_writer_properties;
+}
+
} // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index d08d7b0..40a358c 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -21,6 +21,9 @@
#include <memory>
#include <string>
#include <unordered_map>
+#include <unordered_set>
+
+#include "arrow/type.h"
#include "parquet/exception.h"
#include "parquet/parquet_version.h"
@@ -39,13 +42,13 @@ static bool DEFAULT_USE_BUFFERED_STREAM = false;
class PARQUET_EXPORT ReaderProperties {
public:
- explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
: pool_(pool) {
buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
buffer_size_ = DEFAULT_BUFFER_SIZE;
}
- ::arrow::MemoryPool* memory_pool() const { return pool_; }
+ MemoryPool* memory_pool() const { return pool_; }
std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source,
int64_t start, int64_t num_bytes);
@@ -61,7 +64,7 @@ class PARQUET_EXPORT ReaderProperties {
int64_t buffer_size() const { return buffer_size_; }
private:
- ::arrow::MemoryPool* pool_;
+ MemoryPool* pool_;
int64_t buffer_size_;
bool buffered_stream_enabled_;
};
@@ -142,7 +145,7 @@ class PARQUET_EXPORT WriterProperties {
created_by_(DEFAULT_CREATED_BY) {}
virtual ~Builder() {}
- Builder* memory_pool(::arrow::MemoryPool* pool) {
+ Builder* memory_pool(MemoryPool* pool) {
pool_ = pool;
return this;
}
@@ -320,7 +323,7 @@ class PARQUET_EXPORT WriterProperties {
}
private:
- ::arrow::MemoryPool* pool_;
+ MemoryPool* pool_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
@@ -336,7 +339,7 @@ class PARQUET_EXPORT WriterProperties {
std::unordered_map<std::string, bool> statistics_enabled_;
};
- inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
+ inline MemoryPool* memory_pool() const { return pool_; }
inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
@@ -395,10 +398,9 @@ class PARQUET_EXPORT WriterProperties {
private:
explicit WriterProperties(
- ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
- int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize,
- ParquetVersion::type version, const std::string& created_by,
- const ColumnProperties& default_column_properties,
+ MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
+ int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
+ const std::string& created_by, const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>& column_properties)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
@@ -410,7 +412,7 @@ class PARQUET_EXPORT WriterProperties {
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
- ::arrow::MemoryPool* pool_;
+ MemoryPool* pool_;
int64_t dictionary_pagesize_limit_;
int64_t write_batch_size_;
int64_t max_row_group_length_;
@@ -423,6 +425,161 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
+// ----------------------------------------------------------------------
+// Properties specific to Apache Arrow columnar read and write
+
+static constexpr bool kArrowDefaultUseThreads = false;
+
+// Default number of rows to read when using ::arrow::RecordBatchReader
+static constexpr int64_t kArrowDefaultBatchSize = 64 * 1024;
+
+/// EXPERIMENTAL: Properties for configuring FileReader behavior.
+class PARQUET_EXPORT ArrowReaderProperties {
+ public:
+ explicit ArrowReaderProperties(bool use_threads = kArrowDefaultUseThreads)
+ : use_threads_(use_threads),
+ read_dict_indices_(),
+ batch_size_(kArrowDefaultBatchSize) {}
+
+ void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
+
+ bool use_threads() const { return use_threads_; }
+
+ void set_read_dictionary(int column_index, bool read_dict) {
+ if (read_dict) {
+ read_dict_indices_.insert(column_index);
+ } else {
+ read_dict_indices_.erase(column_index);
+ }
+ }
+ bool read_dictionary(int column_index) const {
+ if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
+
+ int64_t batch_size() const { return batch_size_; }
+
+ private:
+ bool use_threads_;
+ std::unordered_set<int> read_dict_indices_;
+ int64_t batch_size_;
+};
+
+/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
+PARQUET_EXPORT
+ArrowReaderProperties default_arrow_reader_properties();
+
+class PARQUET_EXPORT ArrowWriterProperties {
+ public:
+ class Builder {
+ public:
+ Builder()
+ : write_timestamps_as_int96_(false),
+ coerce_timestamps_enabled_(false),
+ coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
+ truncated_timestamps_allowed_(false) {}
+ virtual ~Builder() {}
+
+ Builder* disable_deprecated_int96_timestamps() {
+ write_timestamps_as_int96_ = false;
+ return this;
+ }
+
+ Builder* enable_deprecated_int96_timestamps() {
+ write_timestamps_as_int96_ = true;
+ return this;
+ }
+
+ Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
+ coerce_timestamps_enabled_ = true;
+ coerce_timestamps_unit_ = unit;
+ return this;
+ }
+
+ Builder* allow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = true;
+ return this;
+ }
+
+ Builder* disallow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = false;
+ return this;
+ }
+
+ std::shared_ptr<ArrowWriterProperties> build() {
+ return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
+ write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+ truncated_timestamps_allowed_));
+ }
+
+ private:
+ bool write_timestamps_as_int96_;
+
+ bool coerce_timestamps_enabled_;
+ ::arrow::TimeUnit::type coerce_timestamps_unit_;
+ bool truncated_timestamps_allowed_;
+ };
+
+ bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
+
+ bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
+ ::arrow::TimeUnit::type coerce_timestamps_unit() const {
+ return coerce_timestamps_unit_;
+ }
+
+ bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
+
+ private:
+ explicit ArrowWriterProperties(bool write_nanos_as_int96,
+ bool coerce_timestamps_enabled,
+ ::arrow::TimeUnit::type coerce_timestamps_unit,
+ bool truncated_timestamps_allowed)
+ : write_timestamps_as_int96_(write_nanos_as_int96),
+ coerce_timestamps_enabled_(coerce_timestamps_enabled),
+ coerce_timestamps_unit_(coerce_timestamps_unit),
+ truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
+
+ const bool write_timestamps_as_int96_;
+ const bool coerce_timestamps_enabled_;
+ const ::arrow::TimeUnit::type coerce_timestamps_unit_;
+ const bool truncated_timestamps_allowed_;
+};
+
+/// \brief State object used for writing Arrow data directly to a Parquet
+/// column chunk. API possibly not stable
+struct ArrowWriteContext {
+ ArrowWriteContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
+ : memory_pool(memory_pool),
+ properties(properties),
+ data_buffer(AllocateBuffer(memory_pool)),
+ def_levels_buffer(AllocateBuffer(memory_pool)) {}
+
+ template <typename T>
+ ::arrow::Status GetScratchData(const int64_t num_values, T** out) {
+ ARROW_RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
+ *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
+ return ::arrow::Status::OK();
+ }
+
+ MemoryPool* memory_pool;
+ const ArrowWriterProperties* properties;
+
+ // Buffer used for storing the data of an array converted to the physical type
+ // as expected by parquet-cpp.
+ std::shared_ptr<ResizableBuffer> data_buffer;
+
+ // We use the shared ownership of this buffer
+ std::shared_ptr<ResizableBuffer> def_levels_buffer;
+};
+
+PARQUET_EXPORT
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties();
+
} // namespace parquet
#endif // PARQUET_COLUMN_PROPERTIES_H
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index e961127..1bada62 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -25,7 +25,6 @@
#include <utility>
#include "arrow/util/logging.h"
-#include "arrow/util/macros.h"
#include "parquet/exception.h"
#include "parquet/schema-internal.h"
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index 822afcd..f7e0cf3 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -1608,4 +1608,80 @@ class LogicalType::Impl::Unknown final : public LogicalType::Impl::SimpleCompati
GENERATE_MAKE(Unknown)
+namespace internal {
+
+/// \brief Compute the number of bytes required to represent a decimal of a
+/// given precision. Taken from the Apache Impala codebase. The comments next
+/// to the return values are the maximum value that can be represented in 2's
+/// complement with the returned number of bytes.
+int32_t DecimalSize(int32_t precision) {
+ DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
+ << precision;
+ DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
+ << precision;
+
+ switch (precision) {
+ case 1:
+ case 2:
+ return 1; // 127
+ case 3:
+ case 4:
+ return 2; // 32,767
+ case 5:
+ case 6:
+ return 3; // 8,388,607
+ case 7:
+ case 8:
+ case 9:
+ return 4; // 2,147,483,427
+ case 10:
+ case 11:
+ return 5; // 549,755,813,887
+ case 12:
+ case 13:
+ case 14:
+ return 6; // 140,737,488,355,327
+ case 15:
+ case 16:
+ return 7; // 36,028,797,018,963,967
+ case 17:
+ case 18:
+ return 8; // 9,223,372,036,854,775,807
+ case 19:
+ case 20:
+ case 21:
+ return 9; // 2,361,183,241,434,822,606,847
+ case 22:
+ case 23:
+ return 10; // 604,462,909,807,314,587,353,087
+ case 24:
+ case 25:
+ case 26:
+ return 11; // 154,742,504,910,672,534,362,390,527
+ case 27:
+ case 28:
+ return 12; // 39,614,081,257,132,168,796,771,975,167
+ case 29:
+ case 30:
+ case 31:
+ return 13; // 10,141,204,801,825,835,211,973,625,643,007
+ case 32:
+ case 33:
+ return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
+ case 34:
+ case 35:
+ return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
+ case 36:
+ case 37:
+ case 38:
+ return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
+ default:
+ break;
+ }
+ DCHECK(false);
+ return -1;
+}
+
+} // namespace internal
+
} // namespace parquet
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index b317784..bc456ea 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -678,6 +678,12 @@ PARQUET_EXPORT SortOrder::type GetSortOrder(ConvertedType::type converted,
PARQUET_EXPORT SortOrder::type GetSortOrder(
const std::shared_ptr<const LogicalType>& logical_type, Type::type primitive);
+namespace internal {
+
+PARQUET_EXPORT
+int32_t DecimalSize(int32_t precision);
+
+} // namespace internal
} // namespace parquet
#endif // PARQUET_TYPES_H
diff --git a/dev/lint/run_iwyu.sh b/dev/lint/run_iwyu.sh
index edae841..46517cf 100755
--- a/dev/lint/run_iwyu.sh
+++ b/dev/lint/run_iwyu.sh
@@ -22,6 +22,7 @@ mkdir -p /build/lint
pushd /build/lint
cmake -GNinja \
+ -DCMAKE_BUILD_TYPE=debug \
-DARROW_FLIGHT=ON \
-DARROW_GANDIVA=ON \
-DARROW_PARQUET=ON \
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 32ee1ad..82ca9fb 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -327,6 +327,15 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
ReaderProperties default_reader_properties()
+ cdef cppclass ArrowReaderProperties:
+ ArrowReaderProperties()
+ void set_read_dictionary(int column_index, c_bool read_dict)
+ c_bool read_dictionary()
+ void set_batch_size()
+ int64_t batch_size()
+
+ ArrowReaderProperties default_arrow_reader_properties()
+
cdef cppclass ParquetFileReader:
shared_ptr[CFileMetaData] metadata()
@@ -348,17 +357,19 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Builder* write_batch_size(int64_t batch_size)
shared_ptr[WriterProperties] build()
+ cdef cppclass ArrowWriterProperties:
+ cppclass Builder:
+ Builder()
+ Builder* disable_deprecated_int96_timestamps()
+ Builder* enable_deprecated_int96_timestamps()
+ Builder* coerce_timestamps(TimeUnit unit)
+ Builder* allow_truncated_timestamps()
+ Builder* disallow_truncated_timestamps()
+ shared_ptr[ArrowWriterProperties] build()
+ c_bool support_deprecated_int96_timestamps()
-cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
- cdef cppclass ArrowReaderProperties:
- ArrowReaderProperties()
- void set_read_dictionary(int column_index, c_bool read_dict)
- c_bool read_dictionary()
- void set_batch_size()
- int64_t batch_size()
-
- ArrowReaderProperties default_arrow_reader_properties()
+cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
cdef cppclass FileReader:
FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out)
@@ -422,17 +433,6 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
const shared_ptr[CFileMetaData] metadata() const
- cdef cppclass ArrowWriterProperties:
- cppclass Builder:
- Builder()
- Builder* disable_deprecated_int96_timestamps()
- Builder* enable_deprecated_int96_timestamps()
- Builder* coerce_timestamps(TimeUnit unit)
- Builder* allow_truncated_timestamps()
- Builder* disallow_truncated_timestamps()
- shared_ptr[ArrowWriterProperties] build()
- c_bool support_deprecated_int96_timestamps()
-
CStatus WriteMetaDataFile(
const CFileMetaData& file_metadata,
const OutputStream* sink)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 8cfaa77..5af51c5 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2754,7 +2754,7 @@ RcppExport SEXP _arrow_ipc___ReadMessage(SEXP stream_sexp){
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<parquet::arrow::ArrowReaderProperties> parquet___arrow___ArrowReaderProperties__Make(bool use_threads);
+std::shared_ptr<parquet::ArrowReaderProperties> parquet___arrow___ArrowReaderProperties__Make(bool use_threads);
RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_threads_sexp){
BEGIN_RCPP
Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
@@ -2769,10 +2769,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_th
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, bool use_threads);
+void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, bool use_threads);
RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){
BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
parquet___arrow___ArrowReaderProperties__set_use_threads(properties, use_threads);
return R_NilValue;
@@ -2786,10 +2786,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads(
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, bool use_threads);
+bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, bool use_threads);
RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){
BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_use_threads(properties, use_threads));
END_RCPP
@@ -2802,10 +2802,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads(
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, int column_index);
+bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, int column_index);
RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp){
BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
Rcpp::traits::input_parameter<int>::type column_index(column_index_sexp);
return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_read_dictionary(properties, column_index));
END_RCPP
@@ -2818,10 +2818,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_diction
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, int column_index, bool read_dict);
+void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, int column_index, bool read_dict);
RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp, SEXP read_dict_sexp){
BEGIN_RCPP
- Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
Rcpp::traits::input_parameter<int>::type column_index(column_index_sexp);
Rcpp::traits::input_parameter<bool>::type read_dict(read_dict_sexp);
parquet___arrow___ArrowReaderProperties__set_read_dictionary(properties, column_index, read_dict);
@@ -2836,11 +2836,11 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_diction
// parquet.cpp
#if defined(ARROW_R_WITH_ARROW)
-std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(const std::shared_ptr<arrow::io::RandomAccessFile>& file, const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& props);
+std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(const std::shared_ptr<arrow::io::RandomAccessFile>& file, const std::shared_ptr<parquet::ArrowReaderProperties>& props);
RcppExport SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp){
BEGIN_RCPP
Rcpp::traits::input_parameter<const std::shared_ptr<arrow::io::RandomAccessFile>&>::type file(file_sexp);
- Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type props(props_sexp);
+ Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type props(props_sexp);
return Rcpp::wrap(parquet___arrow___FileReader__OpenFile(file, props));
END_RCPP
}
diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp
index 5124e9e..692916b 100644
--- a/r/src/parquet.cpp
+++ b/r/src/parquet.cpp
@@ -24,35 +24,35 @@
#include <parquet/exception.h>
// [[arrow::export]]
-std::shared_ptr<parquet::arrow::ArrowReaderProperties>
+std::shared_ptr<parquet::ArrowReaderProperties>
parquet___arrow___ArrowReaderProperties__Make(bool use_threads) {
- return std::make_shared<parquet::arrow::ArrowReaderProperties>(use_threads);
+ return std::make_shared<parquet::ArrowReaderProperties>(use_threads);
}
// [[arrow::export]]
void parquet___arrow___ArrowReaderProperties__set_use_threads(
- const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+ const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
bool use_threads) {
properties->set_use_threads(use_threads);
}
// [[arrow::export]]
bool parquet___arrow___ArrowReaderProperties__get_use_threads(
- const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+ const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
bool use_threads) {
return properties->use_threads();
}
// [[arrow::export]]
bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(
- const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+ const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
int column_index) {
return properties->read_dictionary(column_index);
}
// [[arrow::export]]
void parquet___arrow___ArrowReaderProperties__set_read_dictionary(
- const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+ const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
int column_index, bool read_dict) {
properties->set_read_dictionary(column_index, read_dict);
}
@@ -60,10 +60,11 @@ void parquet___arrow___ArrowReaderProperties__set_read_dictionary(
// [[arrow::export]]
std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(
const std::shared_ptr<arrow::io::RandomAccessFile>& file,
- const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& props) {
+ const std::shared_ptr<parquet::ArrowReaderProperties>& props) {
std::unique_ptr<parquet::arrow::FileReader> reader;
- PARQUET_THROW_NOT_OK(
- parquet::arrow::OpenFile(file, arrow::default_memory_pool(), *props, &reader));
+ parquet::arrow::FileReaderBuilder builder;
+ PARQUET_THROW_NOT_OK(builder.Open(file));
+ PARQUET_THROW_NOT_OK(builder.properties(*props)->Build(&reader));
return reader;
}