You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/06/23 16:20:42 UTC

parquet-cpp git commit: PARQUET-1041: Support Arrow's NullArray

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 05b00fa7e -> 6faff712d


PARQUET-1041: Support Arrow's NullArray

Closes #358. This only includes an Arrow version bump to pick up ARROW-1143

Author: Uwe L. Korn <uw...@apache.org>
Author: Wes McKinney <we...@twosigma.com>

Closes #360 from wesm/PARQUET-1041 and squashes the following commits:

3f8f0bc [Wes McKinney] Bump Arrow version to master
c134bd6 [Uwe L. Korn] Fix int conversion
1def8a4 [Uwe L. Korn] PARQUET-1041: Support Arrow's NullArray


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/6faff712
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/6faff712
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/6faff712

Branch: refs/heads/master
Commit: 6faff712d15d999c08678a5a23b9f689f2f085d8
Parents: 05b00fa
Author: Uwe L. Korn <uw...@apache.org>
Authored: Fri Jun 23 12:20:36 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Jun 23 12:20:36 2017 -0400

----------------------------------------------------------------------
 cmake_modules/ThirdpartyToolchain.cmake       |  2 +-
 src/parquet/arrow/arrow-reader-writer-test.cc | 21 ++++++++++++++++++
 src/parquet/arrow/reader.cc                   | 11 ++++++----
 src/parquet/arrow/schema.cc                   | 12 ++++++++---
 src/parquet/arrow/writer.cc                   | 25 +++++++++++++++++++++-
 src/parquet/file/metadata.cc                  |  1 +
 src/parquet/schema.cc                         |  3 +++
 src/parquet/types.h                           |  3 ++-
 8 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index f958620..2b24e93 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -520,7 +520,7 @@ if (NOT ARROW_FOUND)
   endif()
 
   if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
-    set(ARROW_VERSION "a8f8ba0cbcf5f596f042e90b7a208e7a0c3925b7")
+    set(ARROW_VERSION "e209e5865ea58e57925cae24d4bf3f63d58ee21d")
   else()
     set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
   endif()

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 97bb19b..3beca35 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -802,6 +802,27 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
   ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
 }
 
+using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
+
+TEST_F(TestNullParquetIO, NullColumn) {
+  std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
+
+  std::shared_ptr<Table> out;
+  std::unique_ptr<FileReader> reader;
+  this->ReaderFromSink(&reader);
+  this->ReadTableFromFile(std::move(reader), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(100, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
 template <typename T>
 using ParquetCDataType = typename ParquetDataType<T>::c_type;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 7c1b381..ef9ac34 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -495,10 +495,6 @@ Status FileReader::Impl::ReadTable(
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(indices, &schema));
 
-  int num_fields = static_cast<int>(schema->num_fields());
-  int nthreads = std::min<int>(num_threads_, num_fields);
-  std::vector<std::shared_ptr<Column>> columns(num_fields);
-
   // We only need to read schema fields which have columns indicated
   // in the indices vector
   std::vector<int> field_indices;
@@ -507,6 +503,7 @@ Status FileReader::Impl::ReadTable(
     return Status::Invalid("Invalid column index");
   }
 
+  std::vector<std::shared_ptr<Column>> columns(field_indices.size());
   auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
     RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -514,6 +511,8 @@ Status FileReader::Impl::ReadTable(
     return Status::OK();
   };
 
+  int num_fields = static_cast<int>(field_indices.size());
+  int nthreads = std::min<int>(num_threads_, num_fields);
   if (nthreads == 1) {
     for (int i = 0; i < num_fields; i++) {
       RETURN_NOT_OK(ReadColumnFunc(i));
@@ -1262,6 +1261,10 @@ Status PrimitiveImpl::NextBatch(
   }
 
   switch (field_->type()->id()) {
+    case ::arrow::Type::NA:
+      *out = std::make_shared<::arrow::NullArray>(batch_size);
+      return Status::OK();
+      break;
     TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
     TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
     TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index a78a23b..2a4ddcd 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -166,6 +166,11 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
 }
 
 Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
+  if (primitive->logical_type() == LogicalType::NA) {
+    *out = ::arrow::null();
+    return Status::OK();
+  }
+
   switch (primitive->physical_type()) {
     case ParquetType::BOOLEAN:
       *out = ::arrow::boolean();
@@ -410,9 +415,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
   int length = -1;
 
   switch (field->type()->id()) {
-    // TODO:
-    // case ArrowType::NA:
-    // break;
+    case ArrowType::NA:
+      type = ParquetType::INT32;
+      logical_type = LogicalType::NA;
+      break;
     case ArrowType::BOOL:
       type = ParquetType::BOOLEAN;
       break;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 3344d1b..af4f754 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -62,6 +62,15 @@ class LevelBuilder {
 
   Status VisitInline(const Array& array);
 
+  Status Visit(const ::arrow::NullArray& array) {
+    array_offsets_.push_back(static_cast<int32_t>(array.offset()));
+    valid_bitmaps_.push_back(array.null_bitmap_data());
+    null_counts_.push_back(array.length());
+    values_type_ = array.type_id();
+    values_array_ = &array;
+    return Status::OK();
+  }
+
   Status Visit(const ::arrow::PrimitiveArray& array) {
     array_offsets_.push_back(static_cast<int32_t>(array.offset()));
     valid_bitmaps_.push_back(array.null_bitmap_data());
@@ -98,7 +107,6 @@ class LevelBuilder {
         "Level generation for ArrowTypePrefix not supported yet"); \
   }
 
-  NOT_IMPLEMENTED_VISIT(Null)
   NOT_IMPLEMENTED_VISIT(Struct)
   NOT_IMPLEMENTED_VISIT(Union)
   NOT_IMPLEMENTED_VISIT(Decimal)
@@ -141,6 +149,8 @@ class LevelBuilder {
             reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
         if (array.null_count() == 0) {
           std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
+        } else if (array.null_count() == array.length()) {
+          std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
         } else {
           const uint8_t* valid_bits = array.null_bitmap_data();
           INIT_BITSET(valid_bits, static_cast<int>(array.offset()));
@@ -510,6 +520,18 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
 }
 
 template <>
+Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
+
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+template <>
 Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
     ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
     const int16_t* def_levels, const int16_t* rep_levels) {
@@ -639,6 +661,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
             column_writer, values_array, num_levels, def_levels, rep_levels);
       }
     }
+      WRITE_BATCH_CASE(NA, NullType, Int32Type)
       WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
       WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
       WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index aea7a74..b37ef4f 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -76,6 +76,7 @@ SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
     case LogicalType::BSON:
     case LogicalType::JSON:
       return SortOrder::UNSIGNED;
+    case LogicalType::NA:
     case LogicalType::DECIMAL:
     case LogicalType::LIST:
     case LogicalType::MAP:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index 1209ad1..4efa0b2 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -190,6 +190,9 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio
         throw ParquetException(ss.str());
       }
       break;
+    case LogicalType::NA:
+      // NA can annotate any type
+      break;
     default:
       ss << LogicalTypeToString(logical_type);
       ss << " can not be applied to a primitive type";

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 2b9b11f..8504f5d 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -81,7 +81,8 @@ struct LogicalType {
     INT_64,
     JSON,
     BSON,
-    INTERVAL
+    INTERVAL,
+    NA = 25
   };
 };