You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/07/16 21:43:48 UTC
parquet-cpp git commit: PARQUET-1035: Write Int96 from Arrow
timestamp(ns)
Repository: parquet-cpp
Updated Branches:
refs/heads/master 178ef72a4 -> e998dfb40
PARQUET-1035: Write Int96 from Arrow timestamp(ns)
Closes #356
cc @c-nichols
Author: Colin Nichols <ni...@gmail.com>
Author: Uwe L. Korn <uw...@apache.org>
Closes #371 from xhochy/PARQUET-1035 and squashes the following commits:
20e28e3 [Uwe L. Korn] Make support for INT96 optional
dd197fc [Colin Nichols] Move timestamp conversion function to header file
dfc9a00 [Colin Nichols] PARQUET-1035 Write Int96 from Arrow timestamp(ns)
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/e998dfb4
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/e998dfb4
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/e998dfb4
Branch: refs/heads/master
Commit: e998dfb40403197dbb8efa53d077449c81552d26
Parents: 178ef72
Author: Colin Nichols <ni...@gmail.com>
Authored: Sun Jul 16 17:43:43 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jul 16 17:43:43 2017 -0400
----------------------------------------------------------------------
src/parquet/arrow/arrow-reader-writer-test.cc | 63 +++++++--
src/parquet/arrow/schema.cc | 41 ++++--
src/parquet/arrow/schema.h | 6 +-
src/parquet/arrow/writer.cc | 154 +++++++++++++++++++--
src/parquet/arrow/writer.h | 84 ++++++++++-
5 files changed, 306 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 746ce14..4424ea6 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -290,19 +290,23 @@ template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
- int64_t row_group_size, std::shared_ptr<Buffer>* out) {
+ int64_t row_group_size,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::shared_ptr<Buffer>* out) {
auto sink = std::make_shared<InMemoryOutputStream>();
ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
- row_group_size, default_writer_properties()));
+ row_group_size, default_writer_properties(), arrow_properties));
*out = sink->GetBuffer();
}
void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
int64_t row_group_size, const std::vector<int>& column_subset,
- std::shared_ptr<Table>* out) {
+ std::shared_ptr<Table>* out,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties()) {
std::shared_ptr<Buffer> buffer;
- WriteTableToBuffer(table, num_threads, row_group_size, &buffer);
+ WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
@@ -919,7 +923,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
this->CheckSingleColumnRequiredTableRead(4);
}
-void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
+void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) {
using ::arrow::ArrayFromVector;
std::vector<bool> is_valid = {true, true, true, false, true, true};
@@ -928,26 +932,41 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
auto f0 = field("f0", ::arrow::date32());
auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
- auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
- auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO));
- std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4}));
+ std::shared_ptr<::arrow::Field> f3;
+ if (nanos_as_micros) {
+ f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
+ } else {
+ f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
+ }
+ auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
+ auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
+ std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
std::vector<int32_t> t32_values = {
1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
1489272000000, 1489272000000, 1489273000000};
+ std::vector<int64_t> t64_us_values = {
+ 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
- std::shared_ptr<Array> a0, a1, a2, a3, a4;
+ std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
- ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3);
- ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4);
+ if (nanos_as_micros) {
+ ArrayFromVector<::arrow::TimestampType, int64_t>(
+ f3->type(), is_valid, t64_us_values, &a3);
+ } else {
+ ArrayFromVector<::arrow::TimestampType, int64_t>(
+ f3->type(), is_valid, t64_values, &a3);
+ }
+ ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
+ ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5);
std::vector<std::shared_ptr<::arrow::Column>> columns = {
std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
- std::make_shared<Column>("f4", a4)};
+ std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
*out = std::make_shared<::arrow::Table>(schema, columns);
}
@@ -955,8 +974,17 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
std::shared_ptr<Table> table;
MakeDateTimeTypesTable(&table);
+ // Use deprecated INT96 type
std::shared_ptr<Table> result;
+ DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result,
+ ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
+
+ ASSERT_TRUE(table->Equals(*result));
+
+ // Cast nanaoseconds to microseconds and use INT64 physical type
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+ std::shared_ptr<Table> expected;
+ MakeDateTimeTypesTable(&table, true);
ASSERT_TRUE(table->Equals(*result));
}
@@ -1050,7 +1078,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
MakeDoubleTable(num_columns, num_rows, 1, &table);
std::shared_ptr<Buffer> buffer;
- WriteTableToBuffer(table, 1, num_rows / 2, &buffer);
+ WriteTableToBuffer(table, 1, num_rows / 2, default_arrow_writer_properties(), &buffer);
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(
@@ -1449,6 +1477,15 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
+TEST(TestImpalaConversion, NanosecondToImpala) {
+ // June 20, 2017 16:32:56 and 123456789 nanoseconds
+ int64_t nanoseconds = INT64_C(1497976376123456789);
+ Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
+ Int96 calculated;
+ internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
+ ASSERT_EQ(expected, calculated);
+}
+
TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
// PARQUET-995
const char* data_dir = std::getenv("PARQUET_TEST_DATA");
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 2a4ddcd..d14ee4f 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -381,11 +381,13 @@ Status FromParquetSchema(
}
Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
- bool nullable, const WriterProperties& properties, NodePtr* out) {
+ bool nullable, bool support_int96_nanoseconds, const WriterProperties& properties,
+ NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
NodePtr element;
- RETURN_NOT_OK(FieldToNode(type->value_field(), properties, &element));
+ RETURN_NOT_OK(
+ FieldToNode(type->value_field(), properties, &element, support_int96_nanoseconds));
NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
*out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST);
@@ -393,13 +395,14 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::str
}
Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
- const std::string& name, bool nullable, const WriterProperties& properties,
- NodePtr* out) {
+ const std::string& name, bool nullable, bool support_int96_nanoseconds,
+ const WriterProperties& properties, NodePtr* out) {
Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
- RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
+ RETURN_NOT_OK(
+ FieldToNode(type->child(i), properties, &children[i], support_int96_nanoseconds));
}
*out = GroupNode::Make(name, repetition, children);
@@ -407,7 +410,7 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
}
Status FieldToNode(const std::shared_ptr<Field>& field,
- const WriterProperties& properties, NodePtr* out) {
+ const WriterProperties& properties, NodePtr* out, bool support_int96_nanoseconds) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition =
@@ -486,14 +489,24 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
case ArrowType::TIMESTAMP: {
auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
auto unit = timestamp_type->unit();
- type = ParquetType::INT64;
if (unit == ::arrow::TimeUnit::MILLI) {
+ type = ParquetType::INT64;
logical_type = LogicalType::TIMESTAMP_MILLIS;
} else if (unit == ::arrow::TimeUnit::MICRO) {
+ type = ParquetType::INT64;
logical_type = LogicalType::TIMESTAMP_MICROS;
+ } else if (unit == ::arrow::TimeUnit::NANO) {
+ if (support_int96_nanoseconds) {
+ type = ParquetType::INT96;
+ // No corresponding logical type
+ } else {
+ type = ParquetType::INT64;
+ logical_type = LogicalType::TIMESTAMP_MICROS;
+ }
} else {
return Status::NotImplemented(
- "Only MILLI and MICRO units supported for Arrow timestamps with Parquet.");
+ "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
+ "Parquet.");
}
} break;
case ArrowType::TIME32:
@@ -510,11 +523,13 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
} break;
case ArrowType::STRUCT: {
auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
- return StructToNode(struct_type, field->name(), field->nullable(), properties, out);
+ return StructToNode(struct_type, field->name(), field->nullable(),
+ support_int96_nanoseconds, properties, out);
} break;
case ArrowType::LIST: {
auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
- return ListToNode(list_type, field->name(), field->nullable(), properties, out);
+ return ListToNode(list_type, field->name(), field->nullable(),
+ support_int96_nanoseconds, properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -525,10 +540,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
}
Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
- const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out) {
+ const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
+ bool support_int96_nanoseconds) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
- RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
+ RETURN_NOT_OK(FieldToNode(
+ arrow_schema->field(i), properties, &nodes[i], support_int96_nanoseconds));
}
NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 69ca661..d4f5ea3 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -66,10 +66,12 @@ namespace arrow {
const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
- const WriterProperties& properties, schema::NodePtr* out);
+ const WriterProperties& properties, schema::NodePtr* out,
+ bool support_int96_nanoseconds = false);
::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
- const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out);
+ const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
+ bool support_int96_nanoseconds = false);
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 104c040..c562b27 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -36,6 +36,7 @@ using arrow::Int16Array;
using arrow::Int16Builder;
using arrow::Field;
using arrow::MemoryPool;
+using arrow::NumericArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::ListArray;
@@ -52,6 +53,12 @@ namespace arrow {
namespace BitUtil = ::arrow::BitUtil;
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
+ static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
+ ArrowWriterProperties::Builder().build();
+ return default_writer_properties;
+}
+
class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool)
@@ -241,13 +248,18 @@ Status LevelBuilder::VisitInline(const Array& array) {
class FileWriter::Impl {
public:
- Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
Status NewRowGroup(int64_t chunk_size);
template <typename ParquetType, typename ArrowType>
Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data,
int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
+ Status TypedWriteBatchConvertedNanos(ColumnWriter* writer,
+ const std::shared_ptr<Array>& data, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels);
+
template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
const ArrowType& type, int64_t num_values, int64_t num_levels,
@@ -274,13 +286,16 @@ class FileWriter::Impl {
PoolBuffer data_buffer_;
std::unique_ptr<ParquetFileWriter> writer_;
RowGroupWriter* row_group_writer_;
+ std::shared_ptr<ArrowWriterProperties> arrow_properties_;
};
-FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
: pool_(pool),
data_buffer_(pool),
writer_(std::move(writer)),
- row_group_writer_(nullptr) {}
+ row_group_writer_(nullptr),
+ arrow_properties_(arrow_properties) {}
Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
@@ -361,6 +376,25 @@ Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
return Status::OK();
}
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
+ TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const int64_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
+ auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+ if (type.unit() == TimeUnit::NANO) {
+ for (int i = 0; i < num_values; i++) {
+ internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+ }
+ } else {
+ return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ return Status::OK();
+}
+
#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>( \
@@ -455,6 +489,32 @@ Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
return Status::OK();
}
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
+ TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
+ int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const int64_t* data_ptr) {
+ RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
+ auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+ INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
+
+ if (type.unit() == TimeUnit::NANO) {
+ for (int i = 0; i < num_values; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+ internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+ }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ } else {
+ return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
+ }
+ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+ num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+ return Status::OK();
+}
+
#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \
template <> \
Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \
@@ -475,6 +535,40 @@ NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+Status FileWriter::Impl::TypedWriteBatchConvertedNanos(ColumnWriter* column_writer,
+ const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels) {
+ // Note that we can only use data_buffer_ here as we write timestamps with the fast
+ // path.
+ RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
+ int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
+
+ auto data = static_cast<const NumericArray<::arrow::TimestampType>*>(array.get());
+ auto data_ptr = reinterpret_cast<const int64_t*>(data->values()->data());
+ auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
+
+ // Convert nanoseconds to microseconds
+ for (int64_t i = 0; i < array->length(); i++) {
+ data_buffer_ptr[i] = data_ptr[i] / 1000;
+ }
+
+ std::shared_ptr<::arrow::TimestampType> type =
+ std::static_pointer_cast<::arrow::TimestampType>(
+ ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+ if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+ // no nulls, just dump the data
+ RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
+ array->length(), num_levels, def_levels, rep_levels, data_buffer_ptr)));
+ } else {
+ const uint8_t* valid_bits = data->null_bitmap_data();
+ RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
+ array->length(), num_levels, def_levels, rep_levels, valid_bits, data->offset(),
+ data_buffer_ptr)));
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
@@ -645,6 +739,21 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
}
}
WRITE_BATCH_CASE(NA, NullType, Int32Type)
+ case ::arrow::Type::TIMESTAMP: {
+ auto timestamp_type =
+ static_cast<::arrow::TimestampType*>(values_array->type().get());
+ if (timestamp_type->unit() == ::arrow::TimeUnit::NANO &&
+ arrow_properties_->support_deprecated_int96_timestamps()) {
+ return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
+ column_writer, values_array, num_levels, def_levels, rep_levels);
+ } else if (timestamp_type->unit() == ::arrow::TimeUnit::NANO) {
+ return TypedWriteBatchConvertedNanos(
+ column_writer, values_array, num_levels, def_levels, rep_levels);
+ } else {
+ return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
+ column_writer, values_array, num_levels, def_levels, rep_levels);
+ }
+ }
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -660,7 +769,6 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
- WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
default:
@@ -688,22 +796,32 @@ MemoryPool* FileWriter::memory_pool() const {
FileWriter::~FileWriter() {}
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
- : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+ : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}
Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
std::unique_ptr<FileWriter>* writer) {
+ return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
+}
+
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* writer) {
std::shared_ptr<SchemaDescriptor> parquet_schema;
- RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema));
+ RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema,
+ arrow_properties->support_deprecated_int96_timestamps()));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
std::unique_ptr<ParquetFileWriter> base_writer =
ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
- writer->reset(new FileWriter(pool, std::move(base_writer)));
+ writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties));
return Status::OK();
}
@@ -715,6 +833,15 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
return Open(schema, pool, wrapper, properties, writer);
}
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* writer) {
+ auto wrapper = std::make_shared<ArrowOutputStream>(sink);
+ return Open(schema, pool, wrapper, properties, arrow_properties, writer);
+}
+
Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
// TODO(ARROW-232) Support writing chunked arrays.
for (int i = 0; i < table.num_columns(); i++) {
@@ -742,18 +869,21 @@ Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties) {
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
std::unique_ptr<FileWriter> writer;
- RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, &writer));
+ RETURN_NOT_OK(FileWriter::Open(
+ *table.schema(), pool, sink, properties, arrow_properties, &writer));
RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
return writer->Close();
}
Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties) {
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
- return WriteTable(table, pool, wrapper, chunk_size, properties);
+ return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
}
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 3916298..4f7d2b4 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -40,6 +40,43 @@ class Table;
namespace parquet {
namespace arrow {
+class PARQUET_EXPORT ArrowWriterProperties {
+ public:
+ class Builder {
+ public:
+ Builder() : write_nanos_as_int96_(false) {}
+ virtual ~Builder() {}
+
+ Builder* disable_deprecated_int96_timestamps() {
+ write_nanos_as_int96_ = false;
+ return this;
+ }
+
+ Builder* enable_deprecated_int96_timestamps() {
+ write_nanos_as_int96_ = true;
+ return this;
+ }
+
+ std::shared_ptr<ArrowWriterProperties> build() {
+ return std::shared_ptr<ArrowWriterProperties>(
+ new ArrowWriterProperties(write_nanos_as_int96_));
+ }
+
+ private:
+ bool write_nanos_as_int96_;
+ };
+
+ bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
+
+ private:
+ explicit ArrowWriterProperties(bool write_nanos_as_int96)
+ : write_nanos_as_int96_(write_nanos_as_int96) {}
+
+ const bool write_nanos_as_int96_;
+};
+
+std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
+
/**
* Iterative API:
* Start a new RowGroup/Chunk with NewRowGroup
@@ -47,7 +84,9 @@ namespace arrow {
*/
class PARQUET_EXPORT FileWriter {
public:
- FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+ FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties());
static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<OutputStream>& sink,
@@ -55,8 +94,20 @@ class PARQUET_EXPORT FileWriter {
std::unique_ptr<FileWriter>* writer);
static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+ std::unique_ptr<FileWriter>* writer);
+
+ static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer);
+
+ static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink,
const std::shared_ptr<WriterProperties>& properties,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
std::unique_ptr<FileWriter>* writer);
/**
@@ -87,12 +138,39 @@ class PARQUET_EXPORT FileWriter {
::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+ const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties());
::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+ const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties());
+
+namespace internal {
+
+/**
+ * Timestamp conversion constants
+ */
+constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
+constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000);
+
+/**
+ * Converts nanosecond timestamps to Impala (Int96) format
+ */
+inline void NanosecondsToImpalaTimestamp(
+ const int64_t nanoseconds, Int96* impala_timestamp) {
+ int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays;
+ (*impala_timestamp).value[2] = (uint32_t)julian_days;
+
+ int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay;
+ int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp);
+ *impala_last_day_nanos = last_day_nanos;
+}
+
+} // namespace internal
} // namespace arrow
} // namespace parquet