You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2018/04/17 17:07:49 UTC
[parquet-cpp] branch master updated: PARQUET-1268: Fix conversion
of null list Arrow arrays
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 21b754a PARQUET-1268: Fix conversion of null list Arrow arrays
21b754a is described below
commit 21b754aacd712df1cee5af4a5777f142a02d91b9
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Apr 17 19:07:43 2018 +0200
PARQUET-1268: Fix conversion of null list Arrow arrays
Author: Antoine Pitrou <an...@python.org>
Closes #454 from pitrou/PARQUET-1268-null-list-conversion and squashes the following commits:
e4e7744 [Antoine Pitrou] PARQUET-1268: Fix conversion of null list Arrow arrays
---
src/parquet/arrow/arrow-reader-writer-test.cc | 61 +++++++++++++++++++++------
src/parquet/arrow/reader.cc | 16 +++----
src/parquet/arrow/writer.cc | 21 ++++++---
3 files changed, 66 insertions(+), 32 deletions(-)
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 79a393f..f2402df 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1000,23 +1000,56 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
TEST_F(TestNullParquetIO, NullColumn) {
- std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(SMALL_SIZE);
- std::shared_ptr<Table> table = MakeSimpleTable(values, true);
- this->sink_ = std::make_shared<InMemoryOutputStream>();
- ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
- values->length(), default_writer_properties()));
+ for (int32_t num_rows : {0, SMALL_SIZE}) {
+ std::shared_ptr<Array> values = std::make_shared<::arrow::NullArray>(num_rows);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true /* nullable */);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
- std::shared_ptr<Table> out;
- std::unique_ptr<FileReader> reader;
- this->ReaderFromSink(&reader);
- this->ReadTableFromFile(std::move(reader), &out);
- ASSERT_EQ(1, out->num_columns());
- ASSERT_EQ(100, out->num_rows());
+ const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+ chunk_size, default_writer_properties()));
- std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
- ASSERT_EQ(1, chunked_array->num_chunks());
+ std::shared_ptr<Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(num_rows, out->num_rows());
- internal::AssertArraysEqual(*values, *chunked_array->chunk(0));
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ internal::AssertArraysEqual(*values, *chunked_array->chunk(0));
+ }
+}
+
+TEST_F(TestNullParquetIO, NullListColumn) {
+ std::vector<int32_t> offsets1 = {0};
+ std::vector<int32_t> offsets2 = {0, 2, 2, 3, 115};
+ for (std::vector<int32_t> offsets : {offsets1, offsets2}) {
+ std::shared_ptr<Array> offsets_array, values_array, list_array;
+ ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offsets, &offsets_array);
+ values_array = std::make_shared<::arrow::NullArray>(offsets.back());
+ ASSERT_OK(::arrow::ListArray::FromArrays(*offsets_array, *values_array,
+ default_memory_pool(), &list_array));
+
+ std::shared_ptr<Table> table = MakeSimpleTable(list_array, false /* nullable */);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+
+ const int64_t chunk_size = std::max(static_cast<int64_t>(1), table->num_rows());
+ ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+ chunk_size, default_writer_properties()));
+
+ std::shared_ptr<Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(offsets.size() - 1, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ internal::AssertArraysEqual(*list_array, *chunked_array->chunk(0));
+ }
}
TEST_F(TestNullParquetIO, NullDictionaryColumn) {
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index dd58d7a..1f933e6 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -1235,17 +1235,6 @@ struct TransferFunctor<::arrow::Decimal128Type, Int64Type> {
} break;
Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
- if (!record_reader_->HasMoreData()) {
- // Exhausted all row groups.
- *out = nullptr;
- return Status::OK();
- }
-
- if (field_->type()->id() == ::arrow::Type::NA) {
- *out = std::make_shared<::arrow::NullArray>(records_to_read);
- return Status::OK();
- }
-
try {
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
@@ -1282,6 +1271,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
+ case ::arrow::Type::NA: {
+ *out = std::make_shared<::arrow::NullArray>(record_reader_->values_written());
+ RETURN_NOT_OK(WrapIntoListArray<Int32Type>(out));
+ break;
+ }
case ::arrow::Type::DECIMAL: {
switch (descr_->physical_type()) {
case ::parquet::Type::INT32: {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 5040e0c..a5dae3c 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -193,9 +193,9 @@ class LevelBuilder {
}
Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
- int32_t inner_offset = offsets_[rep_level][index];
- int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
- int64_t recursion_level = rep_level + 1;
+ const int32_t inner_offset = offsets_[rep_level][index];
+ const int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
+ const int64_t recursion_level = rep_level + 1;
if (inner_length == 0) {
return def_levels_.Append(def_level);
}
@@ -205,14 +205,21 @@ class LevelBuilder {
inner_length);
} else {
// We have reached the leaf: primitive list, handle remaining nullables
+ const bool nullable_level = nullable_[recursion_level];
+ const int64_t level_null_count = null_counts_[recursion_level];
+ const uint8_t* level_valid_bitmap = valid_bitmaps_[recursion_level];
+
for (int64_t i = 0; i < inner_length; i++) {
if (i > 0) {
RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1)));
}
- if (nullable_[recursion_level] &&
- ((null_counts_[recursion_level] == 0) ||
- BitUtil::GetBit(valid_bitmaps_[recursion_level],
- inner_offset + i + array_offsets_[recursion_level]))) {
+ if (level_null_count && level_valid_bitmap == nullptr) {
+ // Special case: this is a null array (all elements are null)
+ RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
+ } else if (nullable_level && ((level_null_count == 0) ||
+ BitUtil::GetBit(level_valid_bitmap,
+ inner_offset + i + array_offsets_[recursion_level]))) {
+ // Non-null element in a null level
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
} else {
// This can be produced in two case:
--
To stop receiving notification emails like this one, please contact
uwe@apache.org.