You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/06/23 16:20:42 UTC
parquet-cpp git commit: PARQUET-1041: Support Arrow's NullArray
Repository: parquet-cpp
Updated Branches:
refs/heads/master 05b00fa7e -> 6faff712d
PARQUET-1041: Support Arrow's NullArray
Closes #358. This only includes an Arrow version bump to pick up ARROW-1143
Author: Uwe L. Korn <uw...@apache.org>
Author: Wes McKinney <we...@twosigma.com>
Closes #360 from wesm/PARQUET-1041 and squashes the following commits:
3f8f0bc [Wes McKinney] Bump Arrow version to master
c134bd6 [Uwe L. Korn] Fix int conversion
1def8a4 [Uwe L. Korn] PARQUET-1041: Support Arrow's NullArray
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/6faff712
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/6faff712
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/6faff712
Branch: refs/heads/master
Commit: 6faff712d15d999c08678a5a23b9f689f2f085d8
Parents: 05b00fa
Author: Uwe L. Korn <uw...@apache.org>
Authored: Fri Jun 23 12:20:36 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Jun 23 12:20:36 2017 -0400
----------------------------------------------------------------------
cmake_modules/ThirdpartyToolchain.cmake | 2 +-
src/parquet/arrow/arrow-reader-writer-test.cc | 21 ++++++++++++++++++
src/parquet/arrow/reader.cc | 11 ++++++----
src/parquet/arrow/schema.cc | 12 ++++++++---
src/parquet/arrow/writer.cc | 25 +++++++++++++++++++++-
src/parquet/file/metadata.cc | 1 +
src/parquet/schema.cc | 3 +++
src/parquet/types.h | 3 ++-
8 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index f958620..2b24e93 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -520,7 +520,7 @@ if (NOT ARROW_FOUND)
endif()
if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
- set(ARROW_VERSION "a8f8ba0cbcf5f596f042e90b7a208e7a0c3925b7")
+ set(ARROW_VERSION "e209e5865ea58e57925cae24d4bf3f63d58ee21d")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 97bb19b..3beca35 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -802,6 +802,27 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}
+using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
+
+TEST_F(TestNullParquetIO, NullColumn) {
+ std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+ values->length(), default_writer_properties()));
+
+ std::shared_ptr<Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(100, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
template <typename T>
using ParquetCDataType = typename ParquetDataType<T>::c_type;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 7c1b381..ef9ac34 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -495,10 +495,6 @@ Status FileReader::Impl::ReadTable(
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
- int num_fields = static_cast<int>(schema->num_fields());
- int nthreads = std::min<int>(num_threads_, num_fields);
- std::vector<std::shared_ptr<Column>> columns(num_fields);
-
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
@@ -507,6 +503,7 @@ Status FileReader::Impl::ReadTable(
return Status::Invalid("Invalid column index");
}
+ std::vector<std::shared_ptr<Column>> columns(field_indices.size());
auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) {
std::shared_ptr<Array> array;
RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array));
@@ -514,6 +511,8 @@ Status FileReader::Impl::ReadTable(
return Status::OK();
};
+ int num_fields = static_cast<int>(field_indices.size());
+ int nthreads = std::min<int>(num_threads_, num_fields);
if (nthreads == 1) {
for (int i = 0; i < num_fields; i++) {
RETURN_NOT_OK(ReadColumnFunc(i));
@@ -1262,6 +1261,10 @@ Status PrimitiveImpl::NextBatch(
}
switch (field_->type()->id()) {
+ case ::arrow::Type::NA:
+ *out = std::make_shared<::arrow::NullArray>(batch_size);
+ return Status::OK();
+ break;
TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index a78a23b..2a4ddcd 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -166,6 +166,11 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
}
Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
+ if (primitive->logical_type() == LogicalType::NA) {
+ *out = ::arrow::null();
+ return Status::OK();
+ }
+
switch (primitive->physical_type()) {
case ParquetType::BOOLEAN:
*out = ::arrow::boolean();
@@ -410,9 +415,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
int length = -1;
switch (field->type()->id()) {
- // TODO:
- // case ArrowType::NA:
- // break;
+ case ArrowType::NA:
+ type = ParquetType::INT32;
+ logical_type = LogicalType::NA;
+ break;
case ArrowType::BOOL:
type = ParquetType::BOOLEAN;
break;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 3344d1b..af4f754 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -62,6 +62,15 @@ class LevelBuilder {
Status VisitInline(const Array& array);
+ Status Visit(const ::arrow::NullArray& array) {
+ array_offsets_.push_back(static_cast<int32_t>(array.offset()));
+ valid_bitmaps_.push_back(array.null_bitmap_data());
+ null_counts_.push_back(array.length());
+ values_type_ = array.type_id();
+ values_array_ = &array;
+ return Status::OK();
+ }
+
Status Visit(const ::arrow::PrimitiveArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
@@ -98,7 +107,6 @@ class LevelBuilder {
"Level generation for ArrowTypePrefix not supported yet"); \
}
- NOT_IMPLEMENTED_VISIT(Null)
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)
NOT_IMPLEMENTED_VISIT(Decimal)
@@ -141,6 +149,8 @@ class LevelBuilder {
reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
+ } else if (array.null_count() == array.length()) {
+ std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
} else {
const uint8_t* valid_bits = array.null_bitmap_data();
INIT_BITSET(valid_bits, static_cast<int>(array.offset()));
@@ -510,6 +520,18 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
}
template <>
+Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+ ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+ const int16_t* def_levels, const int16_t* rep_levels) {
+ auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
+
+ PARQUET_CATCH_NOT_OK(
+ writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+ PARQUET_CATCH_NOT_OK(writer->Close());
+ return Status::OK();
+}
+
+template <>
Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels) {
@@ -639,6 +661,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
column_writer, values_array, num_levels, def_levels, rep_levels);
}
}
+ WRITE_BATCH_CASE(NA, NullType, Int32Type)
WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index aea7a74..b37ef4f 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -76,6 +76,7 @@ SortOrder get_sort_order(LogicalType::type converted, Type::type primitive) {
case LogicalType::BSON:
case LogicalType::JSON:
return SortOrder::UNSIGNED;
+ case LogicalType::NA:
case LogicalType::DECIMAL:
case LogicalType::LIST:
case LogicalType::MAP:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index 1209ad1..4efa0b2 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -190,6 +190,9 @@ PrimitiveNode::PrimitiveNode(const std::string& name, Repetition::type repetitio
throw ParquetException(ss.str());
}
break;
+ case LogicalType::NA:
+ // NA can annotate any type
+ break;
default:
ss << LogicalTypeToString(logical_type);
ss << " can not be applied to a primitive type";
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/6faff712/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 2b9b11f..8504f5d 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -81,7 +81,8 @@ struct LogicalType {
INT_64,
JSON,
BSON,
- INTERVAL
+ INTERVAL,
+ NA = 25
};
};