You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/05/02 13:26:48 UTC
parquet-cpp git commit: PARQUET-965: Add FIXED_LEN_BYTE_ARRAY read
and write support in parquet-arrow
Repository: parquet-cpp
Updated Branches:
refs/heads/master 893af978a -> 84ffae5c6
PARQUET-965: Add FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow
The decimal support is left for further commits.
This may also helps ARROW-901
Author: Xianjin YE <ad...@gmail.com>
Closes #315 from advancedxy/PARQUET-965 and squashes the following commits:
402b059 [Xianjin YE] Rewording some comments and minor fix to avoid realloc in Resize
30f3705 [Xianjin YE] Fix FLBA to fixed_size_binary schema convert test failure.
e48a5f5 [Xianjin YE] Add support for FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/84ffae5c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/84ffae5c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/84ffae5c
Branch: refs/heads/master
Commit: 84ffae5c60fd5c5a31f62110712d6d38807b68da
Parents: 893af97
Author: Xianjin YE <ad...@gmail.com>
Authored: Tue May 2 09:26:41 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue May 2 09:26:41 2017 -0400
----------------------------------------------------------------------
src/parquet/arrow/arrow-reader-writer-test.cc | 23 ++++++--
src/parquet/arrow/arrow-schema-test.cc | 3 +-
src/parquet/arrow/reader.cc | 61 ++++++++++++++++++++++
src/parquet/arrow/reader.h | 2 +-
src/parquet/arrow/schema.cc | 8 ++-
src/parquet/arrow/test-util.h | 46 ++++++++++++++++
src/parquet/arrow/writer.cc | 34 ++++++++++++
7 files changed, 171 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 1d87606..4c351b4 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -128,6 +128,8 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) {
return ParquetType::BYTE_ARRAY;
case ArrowId::STRING:
return ParquetType::BYTE_ARRAY;
+ case ArrowId::FIXED_SIZE_BINARY:
+ return ParquetType::FIXED_LEN_BYTE_ARRAY;
case ArrowId::DATE32:
return ParquetType::INT32;
case ArrowId::DATE64:
@@ -265,9 +267,15 @@ struct test_traits<::arrow::BinaryType> {
static std::string const value;
};
+template <>
+struct test_traits<::arrow::FixedSizeBinaryType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY;
+ static std::string const value;
+};
+
const std::string test_traits<::arrow::StringType>::value("Test");
const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");
-
+const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");
template <typename T>
using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
@@ -306,8 +314,17 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
static std::shared_ptr<GroupNode> MakeSimpleSchema(
const ::arrow::DataType& type, Repetition::type repetition) {
+ int byte_width;
+ // Decimal is not implemented yet.
+ switch (type.id()) {
+ case ::arrow::Type::FIXED_SIZE_BINARY:
+ byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
+ break;
+ default:
+ byte_width = -1;
+ }
auto pnode = PrimitiveNode::Make(
- "column1", repetition, get_physical_type(type), get_logical_type(type));
+ "column1", repetition, get_physical_type(type), get_logical_type(type), byte_width);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
@@ -423,7 +440,7 @@ class TestParquetIO : public ::testing::Test {
typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType,
- ::arrow::StringType, ::arrow::BinaryType>
+ ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
TestTypes;
TYPED_TEST_CASE(TestParquetIO, TestTypes);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index a8a8c09..b56646e 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -151,7 +151,8 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
- arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+ arrow_fields.push_back(
+ std::make_shared<Field>("flba-binary", ::arrow::fixed_size_binary(12)));
auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 06e5e22..6d8c4ff 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -228,6 +228,9 @@ class ColumnReader::Impl {
Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
template <typename ArrowType>
+ Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType>
Status InitDataBuffer(int batch_size);
Status InitValidBits(int batch_size);
template <typename ArrowType, typename ParquetType>
@@ -1019,6 +1022,58 @@ Status ColumnReader::Impl::ReadByteArrayBatch(
return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
}
+template <typename ArrowType>
+Status ColumnReader::Impl::ReadFLBABatch(
+ int batch_size, int byte_width, std::shared_ptr<Array>* out) {
+ using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+ int total_levels_read = 0;
+ if (descr_->max_definition_level() > 0) {
+ RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+ }
+ if (descr_->max_repetition_level() > 0) {
+ RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+ }
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+
+ int values_to_read = batch_size;
+ BuilderType builder(pool_, ::arrow::fixed_size_binary(byte_width));
+ while ((values_to_read > 0) && column_reader_) {
+ RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
+ auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(
+ levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read,
+ rep_levels + total_levels_read, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ for (int64_t i = 0; i < levels_read; i++) {
+ RETURN_NOT_OK(builder.Append(values[i].ptr));
+ }
+ } else {
+ int values_idx = 0;
+ int nullable_elements = descr_->schema_node()->is_optional();
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (nullable_elements &&
+ (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
+ RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
+ values_idx++;
+ }
+ }
+ total_levels_read += levels_read;
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+
+ RETURN_NOT_OK(builder.Finish(out));
+ // Check if we should transform this array into an list array.
+ return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
+}
+
template <>
Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
@@ -1059,6 +1114,12 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+ case ::arrow::Type::FIXED_SIZE_BINARY: {
+ int32_t byte_width =
+ static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
+ return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
+ break;
+ }
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index f12acaf..24601b8 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -33,7 +33,7 @@ class MemoryPool;
class RowBatch;
class Status;
class Table;
-}
+} // namespace arrow
namespace parquet {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 4326161..6aeff17 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -73,7 +73,7 @@ static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
switch (node->logical_type()) {
case LogicalType::NONE:
- *out = ::arrow::binary();
+ *out = ::arrow::fixed_size_binary(node->type_length());
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
@@ -469,6 +469,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
case ArrowType::BINARY:
type = ParquetType::BYTE_ARRAY;
break;
+ case ArrowType::FIXED_SIZE_BINARY: {
+ type = ParquetType::FIXED_LEN_BYTE_ARRAY;
+ auto fixed_size_binary_type =
+ static_cast<::arrow::FixedSizeBinaryType*>(field->type().get());
+ length = fixed_size_binary_type->byte_width();
+ } break;
case ArrowType::DATE32:
type = ParquetType::INT32;
logical_type = LogicalType::DATE;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 8bcd314..388250e 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -44,6 +44,9 @@ template <typename ArrowType>
using is_arrow_binary = std::is_same<ArrowType, ::arrow::BinaryType>;
template <typename ArrowType>
+using is_arrow_fixed_size_binary = std::is_same<ArrowType, ::arrow::FixedSizeBinaryType>;
+
+template <typename ArrowType>
using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;
template <class ArrowType>
@@ -98,6 +101,19 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
return builder.Finish(out);
}
+template <typename ArrowType>
+typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+ using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+ // set byte_width to the length of "fixed": 5
+ // todo: find a way to generate test data with more diversity.
+ BuilderType builder(::arrow::default_memory_pool(), ::arrow::fixed_size_binary(5));
+ for (size_t i = 0; i < size; i++) {
+ builder.Append("fixed");
+ }
+ return builder.Finish(out);
+}
+
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
@@ -201,6 +217,36 @@ NullableArray(
return builder.Finish(out);
}
+// This helper function only supports (size/2) nulls yet,
+// same as NullableArray<String|Binary>(..)
+template <typename ArrowType>
+typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type
+NullableArray(
+ size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) {
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+ const int byte_width = 10;
+ BuilderType builder(
+ ::arrow::default_memory_pool(), ::arrow::fixed_size_binary(byte_width));
+
+ const int kBufferSize = byte_width;
+ uint8_t buffer[kBufferSize];
+ for (size_t i = 0; i < size; i++) {
+ if (!valid_bytes[i]) {
+ builder.AppendNull();
+ } else {
+ ::arrow::test::random_bytes(kBufferSize, seed + i, buffer);
+ builder.Append(buffer);
+ }
+ }
+ return builder.Finish(out);
+}
+
// This helper function only supports (size/2) nulls yet.
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 8b0a271..2ebeb4a 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -31,6 +31,7 @@
using arrow::Array;
using arrow::BinaryArray;
+using arrow::FixedSizeBinaryArray;
using arrow::BooleanArray;
using arrow::Int16Array;
using arrow::Int16Builder;
@@ -549,6 +550,38 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
return Status::OK();
}
+template <>
+Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
+ ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels) {
+ RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
+ auto data = static_cast<const FixedSizeBinaryArray*>(array.get());
+ auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+
+ auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+
+ if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+ // no nulls, just dump the data
+ // todo(advancedxy): use a writeBatch to avoid this step
+ for (int64_t i = 0; i < data->length(); i++) {
+ buffer_ptr[i] = FixedLenByteArray(data->GetValue(i));
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ } else {
+ int buffer_idx = 0;
+ for (int64_t i = 0; i < data->length(); i++) {
+ if (!data->IsNull(i)) {
+ buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i));
+ }
+ }
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+ }
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
Status FileWriter::Impl::Close() {
if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
PARQUET_CATCH_NOT_OK(writer_->Close());
@@ -618,6 +651,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
+ WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)