You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/05/02 13:26:48 UTC

parquet-cpp git commit: PARQUET-965: Add FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 893af978a -> 84ffae5c6


PARQUET-965: Add  FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow

The decimal support is left for further commits.

This may also helps ARROW-901

Author: Xianjin YE <ad...@gmail.com>

Closes #315 from advancedxy/PARQUET-965 and squashes the following commits:

402b059 [Xianjin YE] Rewording some comments and minor fix to avoid realloc in Resize
30f3705 [Xianjin YE] Fix FLBA to fixed_size_binary schema convert test failure.
e48a5f5 [Xianjin YE] Add support for FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/84ffae5c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/84ffae5c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/84ffae5c

Branch: refs/heads/master
Commit: 84ffae5c60fd5c5a31f62110712d6d38807b68da
Parents: 893af97
Author: Xianjin YE <ad...@gmail.com>
Authored: Tue May 2 09:26:41 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue May 2 09:26:41 2017 -0400

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 23 ++++++--
 src/parquet/arrow/arrow-schema-test.cc        |  3 +-
 src/parquet/arrow/reader.cc                   | 61 ++++++++++++++++++++++
 src/parquet/arrow/reader.h                    |  2 +-
 src/parquet/arrow/schema.cc                   |  8 ++-
 src/parquet/arrow/test-util.h                 | 46 ++++++++++++++++
 src/parquet/arrow/writer.cc                   | 34 ++++++++++++
 7 files changed, 171 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 1d87606..4c351b4 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -128,6 +128,8 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) {
       return ParquetType::BYTE_ARRAY;
     case ArrowId::STRING:
       return ParquetType::BYTE_ARRAY;
+    case ArrowId::FIXED_SIZE_BINARY:
+      return ParquetType::FIXED_LEN_BYTE_ARRAY;
     case ArrowId::DATE32:
       return ParquetType::INT32;
     case ArrowId::DATE64:
@@ -265,9 +267,15 @@ struct test_traits<::arrow::BinaryType> {
   static std::string const value;
 };
 
+template <>
+struct test_traits<::arrow::FixedSizeBinaryType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY;
+  static std::string const value;
+};
+
 const std::string test_traits<::arrow::StringType>::value("Test");
 const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03");
-
+const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed");
 template <typename T>
 using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 
@@ -306,8 +314,17 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
 
 static std::shared_ptr<GroupNode> MakeSimpleSchema(
     const ::arrow::DataType& type, Repetition::type repetition) {
+  int byte_width;
+  // Decimal is not implemented yet.
+  switch (type.id()) {
+    case ::arrow::Type::FIXED_SIZE_BINARY:
+      byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
+      break;
+    default:
+      byte_width = -1;
+  }
   auto pnode = PrimitiveNode::Make(
-      "column1", repetition, get_physical_type(type), get_logical_type(type));
+      "column1", repetition, get_physical_type(type), get_logical_type(type), byte_width);
   NodePtr node_ =
       GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
   return std::static_pointer_cast<GroupNode>(node_);
@@ -423,7 +440,7 @@ class TestParquetIO : public ::testing::Test {
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
     ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType,
-    ::arrow::StringType, ::arrow::BinaryType>
+    ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
     TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index a8a8c09..b56646e 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -151,7 +151,8 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
 
   parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
       ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
-  arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+  arrow_fields.push_back(
+      std::make_shared<Field>("flba-binary", ::arrow::fixed_size_binary(12)));
 
   auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 06e5e22..6d8c4ff 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -228,6 +228,9 @@ class ColumnReader::Impl {
   Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
 
   template <typename ArrowType>
+  Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
+
+  template <typename ArrowType>
   Status InitDataBuffer(int batch_size);
   Status InitValidBits(int batch_size);
   template <typename ArrowType, typename ParquetType>
@@ -1019,6 +1022,58 @@ Status ColumnReader::Impl::ReadByteArrayBatch(
   return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
 }
 
+template <typename ArrowType>
+Status ColumnReader::Impl::ReadFLBABatch(
+    int batch_size, int byte_width, std::shared_ptr<Array>* out) {
+  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+  int total_levels_read = 0;
+  if (descr_->max_definition_level() > 0) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  if (descr_->max_repetition_level() > 0) {
+    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+
+  int values_to_read = batch_size;
+  BuilderType builder(pool_, ::arrow::fixed_size_binary(byte_width));
+  while ((values_to_read > 0) && column_reader_) {
+    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
+    auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(
+        levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read,
+            rep_levels + total_levels_read, values, &values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      for (int64_t i = 0; i < levels_read; i++) {
+        RETURN_NOT_OK(builder.Append(values[i].ptr));
+      }
+    } else {
+      int values_idx = 0;
+      int nullable_elements = descr_->schema_node()->is_optional();
+      for (int64_t i = 0; i < levels_read; i++) {
+        if (nullable_elements &&
+            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
+          RETURN_NOT_OK(builder.AppendNull());
+        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
+          RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
+          values_idx++;
+        }
+      }
+      total_levels_read += levels_read;
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+
+  RETURN_NOT_OK(builder.Finish(out));
+  // Check if we should transform this array into an list array.
+  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
+}
+
 template <>
 Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
@@ -1059,6 +1114,12 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
     TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
     TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
     TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+    case ::arrow::Type::FIXED_SIZE_BINARY: {
+      int32_t byte_width =
+          static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
+      return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
+      break;
+    }
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index f12acaf..24601b8 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -33,7 +33,7 @@ class MemoryPool;
 class RowBatch;
 class Status;
 class Table;
-}
+}  // namespace arrow
 
 namespace parquet {
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 4326161..6aeff17 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -73,7 +73,7 @@ static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
 static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
   switch (node->logical_type()) {
     case LogicalType::NONE:
-      *out = ::arrow::binary();
+      *out = ::arrow::fixed_size_binary(node->type_length());
       break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
@@ -469,6 +469,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     case ArrowType::BINARY:
       type = ParquetType::BYTE_ARRAY;
       break;
+    case ArrowType::FIXED_SIZE_BINARY: {
+      type = ParquetType::FIXED_LEN_BYTE_ARRAY;
+      auto fixed_size_binary_type =
+          static_cast<::arrow::FixedSizeBinaryType*>(field->type().get());
+      length = fixed_size_binary_type->byte_width();
+    } break;
     case ArrowType::DATE32:
       type = ParquetType::INT32;
       logical_type = LogicalType::DATE;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 8bcd314..388250e 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -44,6 +44,9 @@ template <typename ArrowType>
 using is_arrow_binary = std::is_same<ArrowType, ::arrow::BinaryType>;
 
 template <typename ArrowType>
+using is_arrow_fixed_size_binary = std::is_same<ArrowType, ::arrow::FixedSizeBinaryType>;
+
+template <typename ArrowType>
 using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;
 
 template <class ArrowType>
@@ -98,6 +101,19 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) {
   return builder.Finish(out);
 }
 
+template <typename ArrowType>
+typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+  // set byte_width to the length of "fixed": 5
+  // todo: find a way to generate test data with more diversity.
+  BuilderType builder(::arrow::default_memory_pool(), ::arrow::fixed_size_binary(5));
+  for (size_t i = 0; i < size; i++) {
+    builder.Append("fixed");
+  }
+  return builder.Finish(out);
+}
+
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
@@ -201,6 +217,36 @@ NullableArray(
   return builder.Finish(out);
 }
 
+// This helper function only supports (size/2) nulls yet,
+// same as NullableArray<String|Binary>(..)
+template <typename ArrowType>
+typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type
+NullableArray(
+    size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) {
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+  const int byte_width = 10;
+  BuilderType builder(
+      ::arrow::default_memory_pool(), ::arrow::fixed_size_binary(byte_width));
+
+  const int kBufferSize = byte_width;
+  uint8_t buffer[kBufferSize];
+  for (size_t i = 0; i < size; i++) {
+    if (!valid_bytes[i]) {
+      builder.AppendNull();
+    } else {
+      ::arrow::test::random_bytes(kBufferSize, seed + i, buffer);
+      builder.Append(buffer);
+    }
+  }
+  return builder.Finish(out);
+}
+
 // This helper function only supports (size/2) nulls yet.
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 8b0a271..2ebeb4a 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -31,6 +31,7 @@
 
 using arrow::Array;
 using arrow::BinaryArray;
+using arrow::FixedSizeBinaryArray;
 using arrow::BooleanArray;
 using arrow::Int16Array;
 using arrow::Int16Builder;
@@ -549,6 +550,38 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
   return Status::OK();
 }
 
+template <>
+Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
+  auto data = static_cast<const FixedSizeBinaryArray*>(array.get());
+  auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+
+  auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+
+  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+    // no nulls, just dump the data
+    // todo(advancedxy): use a writeBatch to avoid this step
+    for (int64_t i = 0; i < data->length(); i++) {
+      buffer_ptr[i] = FixedLenByteArray(data->GetValue(i));
+    }
+    PARQUET_CATCH_NOT_OK(
+        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  } else {
+    int buffer_idx = 0;
+    for (int64_t i = 0; i < data->length(); i++) {
+      if (!data->IsNull(i)) {
+        buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i));
+      }
+    }
+    PARQUET_CATCH_NOT_OK(
+        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
 Status FileWriter::Impl::Close() {
   if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
   PARQUET_CATCH_NOT_OK(writer_->Close());
@@ -618,6 +651,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
       WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
       WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
       WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
+      WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
       WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
       WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
       WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)