You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/03/07 20:22:01 UTC
parquet-cpp git commit: PARQUET-890: Support I/O of DATE columns in
parquet_arrow
Repository: parquet-cpp
Updated Branches:
refs/heads/master fe8f98d0e -> c41a718da
PARQUET-890: Support I/O of DATE columns in parquet_arrow
Also fixes a bug on reading INT96 timestamps.
Author: Korn, Uwe <Uw...@blue-yonder.com>
Closes #266 from xhochy/PARQUET-890 and squashes the following commits:
8481c2c [Korn, Uwe] ninja lint
666d41b [Korn, Uwe] PARQUET-890: Support I/O of DATE columns in parquet_arrow
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c41a718d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c41a718d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c41a718d
Branch: refs/heads/master
Commit: c41a718dae9c60465ea0d8c99d6e3bdca11f802f
Parents: fe8f98d
Author: Korn, Uwe <Uw...@blue-yonder.com>
Authored: Tue Mar 7 15:21:51 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Mar 7 15:21:51 2017 -0500
----------------------------------------------------------------------
src/parquet/arrow/arrow-reader-writer-test.cc | 13 ++-
src/parquet/arrow/arrow-schema-test.cc | 11 ++-
src/parquet/arrow/reader.cc | 49 +++++++++-
src/parquet/arrow/schema.cc | 18 +++-
src/parquet/arrow/test-util.h | 49 +++++++++-
src/parquet/arrow/writer.cc | 106 ++++++++++++++-------
6 files changed, 200 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 2dfdbd2..a0a39f1 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -149,6 +149,15 @@ struct test_traits<::arrow::TimestampType> {
const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
template <>
+struct test_traits<::arrow::DateType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::DATE;
+ static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::DateType>::value(14688000000000);
+
+template <>
struct test_traits<::arrow::FloatType> {
static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
static constexpr LogicalType::type logical_enum = LogicalType::NONE;
@@ -309,8 +318,8 @@ class TestParquetIO : public ::testing::Test {
typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
- ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
- ::arrow::StringType, ::arrow::BinaryType>
+ ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::DateType, ::arrow::FloatType,
+ ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType>
TestTypes;
TYPED_TEST_CASE(TestParquetIO, TestTypes);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 43e57d8..8db792f 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -98,6 +98,10 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+ arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));
+
parquet_fields.push_back(
PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
@@ -339,9 +343,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
TEST_F(TestConvertParquetSchema, UnsupportedThings) {
std::vector<NodePtr> unsupported_nodes;
- unsupported_nodes.push_back(PrimitiveNode::Make(
- "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
-
for (const NodePtr& node : unsupported_nodes) {
ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
}
@@ -394,6 +395,10 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+ arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));
+
parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index df34d4c..73f6d87 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -361,7 +361,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
values_to_read, nullptr, nullptr, values, &values_read));
- int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+ int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
for (int64_t i = 0; i < values_read; i++) {
*out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
}
@@ -371,6 +371,24 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
}
template <>
+Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::DateType, Int32Type>(
+ TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
+ RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
+ auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
+ int64_t values_read;
+ PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
+ values_to_read, nullptr, nullptr, values, &values_read));
+
+ int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
+ for (int64_t i = 0; i < values_read; i++) {
+ *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
+ }
+ valid_bits_idx_ += values_read;
+
+ return Status::OK();
+}
+
+template <>
Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
int64_t* levels_read) {
@@ -464,6 +482,30 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
}
template <>
+Status ColumnReader::Impl::ReadNullableBatch<::arrow::DateType, Int32Type>(
+ TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
+ int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
+ RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
+ auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
+ int64_t null_count;
+ PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
+ values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));
+
+ auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+ INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
+ for (int64_t i = 0; i < *values_read; i++) {
+ if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
+ data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
+ }
+ READ_NEXT_BITSET(valid_bits_ptr_);
+ }
+ null_count_ += null_count;
+ valid_bits_idx_ += *values_read;
+
+ return Status::OK();
+}
+
+template <>
Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
@@ -843,6 +885,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TYPED_BATCH_CASE(DATE, ::arrow::DateType, Int32Type)
TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
@@ -865,7 +908,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
break;
}
default:
- return Status::NotImplemented(field_->type->ToString());
+ std::stringstream ss;
+ ss << "No support for reading columns of type " << field_->type->ToString();
+ return Status::NotImplemented(ss.str());
}
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 65e3381..0c336d9 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -77,7 +77,10 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
*out = MakeDecimalType(node);
break;
default:
- return Status::NotImplemented("unhandled type");
+ std::stringstream ss;
+ ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ << " for fixed-length binary array";
+ return Status::NotImplemented(ss.str());
break;
}
@@ -104,11 +107,17 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::UINT_32:
*out = ::arrow::uint32();
break;
+ case LogicalType::DATE:
+ *out = ::arrow::date();
+ break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
default:
- return Status::NotImplemented("Unhandled logical type for int32");
+ std::stringstream ss;
+ ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ << " for INT32";
+ return Status::NotImplemented(ss.str());
break;
}
return Status::OK();
@@ -129,7 +138,10 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
*out = TIMESTAMP_MS;
break;
default:
- return Status::NotImplemented("Unhandled logical type for int64");
+ std::stringstream ss;
+ ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+ << " for INT64";
+ return Status::NotImplemented(ss.str());
break;
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index bfc9ce1..07f1f28 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -34,6 +34,9 @@ template <typename ArrowType>
using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
template <typename ArrowType>
+using is_arrow_date = std::is_same<ArrowType, ::arrow::DateType>;
+
+template <typename ArrowType>
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
template <typename ArrowType>
@@ -53,10 +56,27 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
}
template <class ArrowType>
-typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NonNullArray(
+typename std::enable_if<
+ is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+ std::vector<typename ArrowType::c_type> values;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+
+ // Passing data type so this will work with TimestampType too
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return builder.Finish(out);
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ for (size_t i = 0; i < size; i++) {
+ values[i] *= 86400000;
+ }
// Passing data type so this will work with TimestampType too
::arrow::NumericBuilder<ArrowType> builder(
@@ -107,13 +127,38 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
// This helper function only supports (size/2) nulls.
template <typename ArrowType>
-typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray(
+typename std::enable_if<
+ is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
+NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
+ std::vector<typename ArrowType::c_type> values;
+
+ // Seed is random in Arrow right now
+ (void)seed;
+ ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ // Passing data type so this will work with TimestampType too
+ ::arrow::NumericBuilder<ArrowType> builder(
+ ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return builder.Finish(out);
+}
+
+template <typename ArrowType>
+typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
std::vector<typename ArrowType::c_type> values;
// Seed is random in Arrow right now
(void)seed;
::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+ for (size_t i = 0; i < size; i++) {
+ values[i] *= 86400000;
+ }
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 90e037f..6d2f9c0 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -257,39 +257,16 @@ class FileWriter::Impl {
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
+ Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
+ int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
+ const typename ArrowType::c_type* data_ptr);
+
+ template <typename ParquetType, typename ArrowType>
Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
const uint8_t* valid_bits, int64_t valid_bits_offset,
const typename ArrowType::c_type* data_ptr);
- // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
- // buffer
- template <typename InType, typename OutType>
- struct can_copy_ptr {
- static constexpr bool value =
- std::is_same<InType, OutType>::value ||
- (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
- (sizeof(InType) == sizeof(OutType)));
- };
-
- template <typename InType, typename OutType,
- typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
- Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
- *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
- return Status::OK();
- }
-
- template <typename InType, typename OutType,
- typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
- Status ConvertPhysicalType(
- const InType* in_ptr, int64_t length, const OutType** out_ptr) {
- RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
- OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
- std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
- *out_ptr = mutable_out_ptr;
- return Status::OK();
- }
-
Status WriteColumnChunk(const Array& data);
Status Close();
@@ -323,7 +300,6 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels) {
using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
auto data = static_cast<const PrimitiveArray*>(array.get());
auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data());
@@ -331,11 +307,8 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
// no nulls, just dump the data
- const ParquetCType* data_writer_ptr = nullptr;
- RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
- data_ptr + data->offset(), array->length(), &data_writer_ptr)));
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr));
+ RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer, array->length(),
+ num_levels, def_levels, rep_levels, data_ptr + data->offset())));
} else {
const uint8_t* valid_bits = data->null_bitmap_data();
RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(),
@@ -347,6 +320,49 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
}
template <typename ParquetType, typename ArrowType>
+Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) {
+ using ParquetCType = typename ParquetType::c_type;
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
+ auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
+ std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ return Status::OK();
+}
+
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::DateType>(
+ TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+ auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+ for (int i = 0; i < num_values; i++) {
+ buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ return Status::OK();
+}
+
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
+ template <> \
+ Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
+ TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
+ const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) { \
+ PARQUET_CATCH_NOT_OK( \
+ writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
+ return Status::OK(); \
+ }
+
+NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
+NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+
+template <typename ParquetType, typename ArrowType>
Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
int64_t num_values, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
@@ -368,6 +384,27 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writ
return Status::OK();
}
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::DateType>(
+ TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const int64_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+ auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+ INIT_BITSET(valid_bits, valid_bits_offset);
+ for (int i = 0; i < num_values; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+ // Convert from milliseconds into days since the epoch
+ buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+ }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+ return Status::OK();
+}
+
#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
@@ -519,6 +556,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
+ WRITE_BATCH_CASE(DATE, DateType, Int32Type)
WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)