You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/04 01:11:35 UTC
[arrow] branch master updated: PARQUET-1469: [C++] Fix data
corruption bug in parquet::internal::DefinitionLevelsToBitmap that was
triggered through random data
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c204d3e PARQUET-1469: [C++] Fix data corruption bug in parquet::internal::DefinitionLevelsToBitmap that was triggered through random data
c204d3e is described below
commit c204d3ec9a3cddd32bfa6c73fedd5ce0970b660a
Author: Wes McKinney <we...@apache.org>
AuthorDate: Mon Dec 3 19:11:28 2018 -0600
PARQUET-1469: [C++] Fix data corruption bug in parquet::internal::DefinitionLevelsToBitmap that was triggered through random data
I also refactored some code to aid with debugging
Author: Wes McKinney <we...@apache.org>
Closes #3080 from wesm/PARQUET-1469 and squashes the following commits:
5863562d0 <Wes McKinney> Fix bug in parquet::internal::DefinitionLevelsToBitmap that was triggered by the PRNG change. Some refactoring to assist with debugging
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 9 +-
cpp/src/parquet/arrow/record_reader.cc | 203 +++++++++++++---------
cpp/src/parquet/arrow/record_reader.h | 2 +
cpp/src/parquet/arrow/test-util.h | 38 ----
cpp/src/parquet/column_reader-test.cc | 29 ++++
cpp/src/parquet/column_reader.h | 6 +-
6 files changed, 163 insertions(+), 124 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 8aedd38..b8eb094 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1712,6 +1712,7 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
TEST(TestArrowReadWrite, ListLargeRecords) {
// PARQUET-1308: This test passed on Linux when num_rows was smaller
const int num_rows = 2000;
+ const int row_group_size = 100;
std::shared_ptr<Array> list_array;
std::shared_ptr<::DataType> list_type;
@@ -1723,8 +1724,8 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
std::shared_ptr<Table> table = Table::Make(schema, {list_array});
std::shared_ptr<Buffer> buffer;
- ASSERT_NO_FATAL_FAILURE(
- WriteTableToBuffer(table, 100, default_arrow_writer_properties(), &buffer));
+ ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+ default_arrow_writer_properties(), &buffer));
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
@@ -1736,7 +1737,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
- // Read chunked
+ // Read 1 record at a time
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
@@ -2263,7 +2264,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
const int num_trees = 3;
const int depth = 3;
#else
- const int num_trees = 10;
+ const int num_trees = 5;
const int depth = 5;
#endif
const int num_children = 3;
diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc
index ce6fa2a..4a3cd52 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <iostream>
#include <memory>
#include <sstream>
#include <unordered_map>
@@ -59,6 +60,10 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}
+// The minimum number of repetition/definition levels to decode at a time, for
+// better vectorized performance when doing many smaller record reads
+constexpr int64_t kMinLevelBatchSize = 1024;
+
class RecordReader::RecordReaderImpl {
public:
RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
@@ -94,7 +99,88 @@ class RecordReader::RecordReaderImpl {
virtual ~RecordReaderImpl() = default;
- virtual int64_t ReadRecords(int64_t num_records) = 0;
+ virtual int64_t ReadRecordData(const int64_t num_records) = 0;
+
+ // Returns true if there are still values in this column.
+ bool HasNext() {
+ // Either there is no data page available yet, or the data page has been
+ // exhausted
+ if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+ if (!ReadNewPage() || num_buffered_values_ == 0) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ int64_t ReadRecords(int64_t num_records) {
+ // Delimit records, then read values at the end
+ int64_t records_read = 0;
+
+ if (levels_position_ < levels_written_) {
+ records_read += ReadRecordData(num_records);
+ }
+
+ int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
+
+ // If we are in the middle of a record, we continue until reaching the
+ // desired number of records or the end of the current record if we've found
+ // enough records
+ while (!at_record_start_ || records_read < num_records) {
+ // Is there more data to read in this row group?
+ if (!HasNext()) {
+ if (!at_record_start_) {
+ // We ended the row group while inside a record that we haven't seen
+ // the end of yet. So increment the record count for the last record in
+ // the row group
+ ++records_read;
+ at_record_start_ = true;
+ }
+ break;
+ }
+
+ /// We perform multiple batch reads until we either exhaust the row group
+ /// or observe the desired number of records
+ int64_t batch_size = std::min(level_batch_size, available_values_current_page());
+
+ // No more data in column
+ if (batch_size == 0) {
+ break;
+ }
+
+ if (max_def_level_ > 0) {
+ ReserveLevels(batch_size);
+
+ int16_t* def_levels = this->def_levels() + levels_written_;
+ int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+ // Not present for non-repeated fields
+ int64_t levels_read = 0;
+ if (max_rep_level_ > 0) {
+ levels_read = ReadDefinitionLevels(batch_size, def_levels);
+ if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
+ throw ParquetException("Number of decoded rep / def levels did not match");
+ }
+ } else if (max_def_level_ > 0) {
+ levels_read = ReadDefinitionLevels(batch_size, def_levels);
+ }
+
+ // Exhausted column chunk
+ if (levels_read == 0) {
+ break;
+ }
+
+ levels_written_ += levels_read;
+ records_read += ReadRecordData(num_records - records_read);
+ } else {
+ // No repetition or definition levels
+ batch_size = std::min(num_records - records_read, batch_size);
+ records_read += ReadRecordData(batch_size);
+ }
+ }
+
+ return records_read;
+ }
// Dictionary decoders must be reset when advancing row groups
virtual void ResetDecoders() = 0;
@@ -303,7 +389,11 @@ class RecordReader::RecordReaderImpl {
}
}
+ virtual void DebugPrintState() = 0;
+
protected:
+ virtual bool ReadNewPage() = 0;
+
const ColumnDescriptor* descr_;
::arrow::MemoryPool* pool_;
@@ -359,10 +449,6 @@ class RecordReader::RecordReaderImpl {
std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
};
-// The minimum number of repetition/definition levels to decode at a time, for
-// better vectorized performance when doing many smaller record reads
-constexpr int64_t kMinLevelBatchSize = 1024;
-
template <typename DType>
class TypedRecordReader : public RecordReader::RecordReaderImpl {
public:
@@ -390,7 +476,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
}
// Return number of logical records read
- int64_t ReadRecordData(const int64_t num_records) {
+ int64_t ReadRecordData(const int64_t num_records) override {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
@@ -434,85 +520,30 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
return records_read;
}
- // Returns true if there are still values in this column.
- bool HasNext() {
- // Either there is no data page available yet, or the data page has been
- // exhausted
- if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
- if (!ReadNewPage() || num_buffered_values_ == 0) {
- return false;
- }
- }
- return true;
- }
+ void DebugPrintState() override {
+ const int16_t* def_levels = this->def_levels();
+ const int16_t* rep_levels = this->rep_levels();
+ const int64_t total_levels_read = levels_position_;
- int64_t ReadRecords(int64_t num_records) override {
- // Delimit records, then read values at the end
- int64_t records_read = 0;
+ const T* values = reinterpret_cast<const T*>(this->values());
- if (levels_position_ < levels_written_) {
- records_read += ReadRecordData(num_records);
+ std::cout << "def levels: ";
+ for (int64_t i = 0; i < total_levels_read; ++i) {
+ std::cout << def_levels[i] << " ";
}
+ std::cout << std::endl;
- int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
-
- // If we are in the middle of a record, we continue until reaching the
- // desired number of records or the end of the current record if we've found
- // enough records
- while (!at_record_start_ || records_read < num_records) {
- // Is there more data to read in this row group?
- if (!HasNext()) {
- if (!at_record_start_) {
- // We ended the row group while inside a record that we haven't seen
- // the end of yet. So increment the record count for the last record in
- // the row group
- ++records_read;
- at_record_start_ = true;
- }
- break;
- }
-
- /// We perform multiple batch reads until we either exhaust the row group
- /// or observe the desired number of records
- int64_t batch_size = std::min(level_batch_size, available_values_current_page());
-
- // No more data in column
- if (batch_size == 0) {
- break;
- }
-
- if (max_def_level_ > 0) {
- ReserveLevels(batch_size);
-
- int16_t* def_levels = this->def_levels() + levels_written_;
- int16_t* rep_levels = this->rep_levels() + levels_written_;
-
- // Not present for non-repeated fields
- int64_t levels_read = 0;
- if (max_rep_level_ > 0) {
- levels_read = ReadDefinitionLevels(batch_size, def_levels);
- if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
- throw ParquetException("Number of decoded rep / def levels did not match");
- }
- } else if (max_def_level_ > 0) {
- levels_read = ReadDefinitionLevels(batch_size, def_levels);
- }
-
- // Exhausted column chunk
- if (levels_read == 0) {
- break;
- }
-
- levels_written_ += levels_read;
- records_read += ReadRecordData(num_records - records_read);
- } else {
- // No repetition or definition levels
- batch_size = std::min(num_records - records_read, batch_size);
- records_read += ReadRecordData(batch_size);
- }
+ std::cout << "rep levels: ";
+ for (int64_t i = 0; i < total_levels_read; ++i) {
+ std::cout << rep_levels[i] << " ";
}
+ std::cout << std::endl;
- return records_read;
+ std::cout << "values: ";
+ for (int64_t i = 0; i < this->values_written(); ++i) {
+ std::cout << values[i] << " ";
+ }
+ std::cout << std::endl;
}
private:
@@ -526,11 +557,21 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
DecoderType* current_decoder_;
// Advance to the next data page
- bool ReadNewPage();
+ bool ReadNewPage() override;
void ConfigureDictionary(const DictionaryPage* page);
};
+// TODO(wesm): Implement these to some satisfaction
+template <>
+void TypedRecordReader<Int96Type>::DebugPrintState() {}
+
+template <>
+void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
+
+template <>
+void TypedRecordReader<FLBAType>::DebugPrintState() {}
+
template <>
inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
auto values = ValuesHead<ByteArray>();
@@ -822,5 +863,7 @@ void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
impl_->SetPageReader(std::move(reader));
}
+void RecordReader::DebugPrintState() { impl_->DebugPrintState(); }
+
} // namespace internal
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h
index 8da0709..7efd0d5 100644
--- a/cpp/src/parquet/arrow/record_reader.h
+++ b/cpp/src/parquet/arrow/record_reader.h
@@ -104,6 +104,8 @@ class RecordReader {
/// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
void SetPageReader(std::unique_ptr<PageReader> reader);
+ void DebugPrintState();
+
private:
std::unique_ptr<RecordReaderImpl> impl_;
explicit RecordReader(RecordReaderImpl* impl);
diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h
index d425cb0..097e369 100644
--- a/cpp/src/parquet/arrow/test-util.h
+++ b/cpp/src/parquet/arrow/test-util.h
@@ -484,44 +484,6 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
EXPECT_TRUE(result->Equals(*expected_array));
}
-template <typename ParquetType>
-void PrintBufferedLevels(const RecordReader& reader) {
- using T = typename ::parquet::type_traits<ParquetType::type_num>::value_type;
-
- const int16_t* def_levels = reader.def_levels();
- const int16_t* rep_levels = reader.rep_levels();
- const int64_t total_levels_read = reader.levels_position();
-
- const T* values = reinterpret_cast<const T*>(reader.values());
-
- std::cout << "def levels: ";
- for (int64_t i = 0; i < total_levels_read; ++i) {
- std::cout << def_levels[i] << " ";
- }
- std::cout << std::endl;
-
- std::cout << "rep levels: ";
- for (int64_t i = 0; i < total_levels_read; ++i) {
- std::cout << rep_levels[i] << " ";
- }
- std::cout << std::endl;
-
- std::cout << "values: ";
- for (int64_t i = 0; i < reader.values_written(); ++i) {
- std::cout << values[i] << " ";
- }
- std::cout << std::endl;
-}
-
-template <>
-void PrintBufferedLevels<ByteArrayType>(const RecordReader& reader) {}
-
-template <>
-void PrintBufferedLevels<FLBAType>(const RecordReader& reader) {}
-
-template <>
-void PrintBufferedLevels<Int96Type>(const RecordReader& reader) {}
-
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/column_reader-test.cc b/cpp/src/parquet/column_reader-test.cc
index 273b302..60f2be2 100644
--- a/cpp/src/parquet/column_reader-test.cc
+++ b/cpp/src/parquet/column_reader-test.cc
@@ -386,5 +386,34 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
pages_.clear();
}
+TEST(TestColumnReader, DefinitionLevelsToBitmap) {
+ // Bugs in this function were exposed in ARROW-3930
+ std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3};
+ std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1};
+
+ std::vector<uint8_t> valid_bits(2, 0);
+
+ const int max_def_level = 3;
+ const int max_rep_level = 1;
+
+ int64_t values_read = -1;
+ int64_t null_count = 0;
+ internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level,
+ &values_read, &null_count, valid_bits.data(),
+ 0 /* valid_bits_offset */);
+ ASSERT_EQ(9, values_read);
+ ASSERT_EQ(1, null_count);
+
+ // Call again with 0 definition levels, make sure that valid_bits is unmodifed
+ const uint8_t current_byte = valid_bits[1];
+ null_count = 0;
+ internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level,
+ &values_read, &null_count, valid_bits.data(),
+ 9 /* valid_bits_offset */);
+ ASSERT_EQ(0, values_read);
+ ASSERT_EQ(0, null_count);
+ ASSERT_EQ(current_byte, valid_bits[1]);
+}
+
} // namespace test
} // namespace parquet
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 960f210..42bf900 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -178,9 +178,11 @@ namespace internal {
static inline void DefinitionLevelsToBitmap(
const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
- uint8_t* valid_bits, const int64_t valid_bits_offset) {
+ uint8_t* valid_bits, int64_t valid_bits_offset) {
+ // We assume here that valid_bits is large enough to accommodate the
+ // additional definition levels and the ones that have already been written
::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
- num_def_levels);
+ valid_bits_offset + num_def_levels);
// TODO(itaiin): As an interim solution we are splitting the code path here
// between repeated+flat column reads, and non-repeated+nested reads.