You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/04 01:11:35 UTC

[arrow] branch master updated: PARQUET-1469: [C++] Fix data corruption bug in parquet::internal::DefinitionLevelsToBitmap that was triggered through random data

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c204d3e  PARQUET-1469: [C++] Fix data corruption bug in parquet::internal::DefinitionLevelsToBitmap that was triggered through random data
c204d3e is described below

commit c204d3ec9a3cddd32bfa6c73fedd5ce0970b660a
Author: Wes McKinney <we...@apache.org>
AuthorDate: Mon Dec 3 19:11:28 2018 -0600

    PARQUET-1469: [C++] Fix data corruption bug in parquet::internal::DefinitionLevelsToBitmap that was triggered through random data
    
    I also refactored some code to aid with debugging
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #3080 from wesm/PARQUET-1469 and squashes the following commits:
    
    5863562d0 <Wes McKinney> Fix bug in parquet::internal::DefinitionLevelsToBitmap that was triggered by the PRNG change. Some refactoring to assist with debugging
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |   9 +-
 cpp/src/parquet/arrow/record_reader.cc            | 203 +++++++++++++---------
 cpp/src/parquet/arrow/record_reader.h             |   2 +
 cpp/src/parquet/arrow/test-util.h                 |  38 ----
 cpp/src/parquet/column_reader-test.cc             |  29 ++++
 cpp/src/parquet/column_reader.h                   |   6 +-
 6 files changed, 163 insertions(+), 124 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 8aedd38..b8eb094 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1712,6 +1712,7 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
 TEST(TestArrowReadWrite, ListLargeRecords) {
   // PARQUET-1308: This test passed on Linux when num_rows was smaller
   const int num_rows = 2000;
+  const int row_group_size = 100;
 
   std::shared_ptr<Array> list_array;
   std::shared_ptr<::DataType> list_type;
@@ -1723,8 +1724,8 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   std::shared_ptr<Table> table = Table::Make(schema, {list_array});
 
   std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(
-      WriteTableToBuffer(table, 100, default_arrow_writer_properties(), &buffer));
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             default_arrow_writer_properties(), &buffer));
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
@@ -1736,7 +1737,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
   ASSERT_OK_NO_THROW(reader->ReadTable(&result));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
 
-  // Read chunked
+  // Read 1 record at a time
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
                               ::arrow::default_memory_pool(),
                               ::parquet::default_reader_properties(), nullptr, &reader));
@@ -2263,7 +2264,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
   const int num_trees = 3;
   const int depth = 3;
 #else
-  const int num_trees = 10;
+  const int num_trees = 5;
   const int depth = 5;
 #endif
   const int num_children = 3;
diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc
index ce6fa2a..4a3cd52 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <iostream>
 #include <memory>
 #include <sstream>
 #include <unordered_map>
@@ -59,6 +60,10 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
   return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
 }
 
+// The minimum number of repetition/definition levels to decode at a time, for
+// better vectorized performance when doing many smaller record reads
+constexpr int64_t kMinLevelBatchSize = 1024;
+
 class RecordReader::RecordReaderImpl {
  public:
   RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
@@ -94,7 +99,88 @@ class RecordReader::RecordReaderImpl {
 
   virtual ~RecordReaderImpl() = default;
 
-  virtual int64_t ReadRecords(int64_t num_records) = 0;
+  virtual int64_t ReadRecordData(const int64_t num_records) = 0;
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    // Either there is no data page available yet, or the data page has been
+    // exhausted
+    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+      if (!ReadNewPage() || num_buffered_values_ == 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  int64_t ReadRecords(int64_t num_records) {
+    // Delimit records, then read values at the end
+    int64_t records_read = 0;
+
+    if (levels_position_ < levels_written_) {
+      records_read += ReadRecordData(num_records);
+    }
+
+    int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
+
+    // If we are in the middle of a record, we continue until reaching the
+    // desired number of records or the end of the current record if we've found
+    // enough records
+    while (!at_record_start_ || records_read < num_records) {
+      // Is there more data to read in this row group?
+      if (!HasNext()) {
+        if (!at_record_start_) {
+          // We ended the row group while inside a record that we haven't seen
+          // the end of yet. So increment the record count for the last record in
+          // the row group
+          ++records_read;
+          at_record_start_ = true;
+        }
+        break;
+      }
+
+      /// We perform multiple batch reads until we either exhaust the row group
+      /// or observe the desired number of records
+      int64_t batch_size = std::min(level_batch_size, available_values_current_page());
+
+      // No more data in column
+      if (batch_size == 0) {
+        break;
+      }
+
+      if (max_def_level_ > 0) {
+        ReserveLevels(batch_size);
+
+        int16_t* def_levels = this->def_levels() + levels_written_;
+        int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+        // Not present for non-repeated fields
+        int64_t levels_read = 0;
+        if (max_rep_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+          if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
+            throw ParquetException("Number of decoded rep / def levels did not match");
+          }
+        } else if (max_def_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+        }
+
+        // Exhausted column chunk
+        if (levels_read == 0) {
+          break;
+        }
+
+        levels_written_ += levels_read;
+        records_read += ReadRecordData(num_records - records_read);
+      } else {
+        // No repetition or definition levels
+        batch_size = std::min(num_records - records_read, batch_size);
+        records_read += ReadRecordData(batch_size);
+      }
+    }
+
+    return records_read;
+  }
 
   // Dictionary decoders must be reset when advancing row groups
   virtual void ResetDecoders() = 0;
@@ -303,7 +389,11 @@ class RecordReader::RecordReaderImpl {
     }
   }
 
+  virtual void DebugPrintState() = 0;
+
  protected:
+  virtual bool ReadNewPage() = 0;
+
   const ColumnDescriptor* descr_;
   ::arrow::MemoryPool* pool_;
 
@@ -359,10 +449,6 @@ class RecordReader::RecordReaderImpl {
   std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
 };
 
-// The minimum number of repetition/definition levels to decode at a time, for
-// better vectorized performance when doing many smaller record reads
-constexpr int64_t kMinLevelBatchSize = 1024;
-
 template <typename DType>
 class TypedRecordReader : public RecordReader::RecordReaderImpl {
  public:
@@ -390,7 +476,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
   }
 
   // Return number of logical records read
-  int64_t ReadRecordData(const int64_t num_records) {
+  int64_t ReadRecordData(const int64_t num_records) override {
     // Conservative upper bound
     const int64_t possible_num_values =
         std::max(num_records, levels_written_ - levels_position_);
@@ -434,85 +520,30 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
     return records_read;
   }
 
-  // Returns true if there are still values in this column.
-  bool HasNext() {
-    // Either there is no data page available yet, or the data page has been
-    // exhausted
-    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
-      if (!ReadNewPage() || num_buffered_values_ == 0) {
-        return false;
-      }
-    }
-    return true;
-  }
+  void DebugPrintState() override {
+    const int16_t* def_levels = this->def_levels();
+    const int16_t* rep_levels = this->rep_levels();
+    const int64_t total_levels_read = levels_position_;
 
-  int64_t ReadRecords(int64_t num_records) override {
-    // Delimit records, then read values at the end
-    int64_t records_read = 0;
+    const T* values = reinterpret_cast<const T*>(this->values());
 
-    if (levels_position_ < levels_written_) {
-      records_read += ReadRecordData(num_records);
+    std::cout << "def levels: ";
+    for (int64_t i = 0; i < total_levels_read; ++i) {
+      std::cout << def_levels[i] << " ";
     }
+    std::cout << std::endl;
 
-    int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
-
-    // If we are in the middle of a record, we continue until reaching the
-    // desired number of records or the end of the current record if we've found
-    // enough records
-    while (!at_record_start_ || records_read < num_records) {
-      // Is there more data to read in this row group?
-      if (!HasNext()) {
-        if (!at_record_start_) {
-          // We ended the row group while inside a record that we haven't seen
-          // the end of yet. So increment the record count for the last record in
-          // the row group
-          ++records_read;
-          at_record_start_ = true;
-        }
-        break;
-      }
-
-      /// We perform multiple batch reads until we either exhaust the row group
-      /// or observe the desired number of records
-      int64_t batch_size = std::min(level_batch_size, available_values_current_page());
-
-      // No more data in column
-      if (batch_size == 0) {
-        break;
-      }
-
-      if (max_def_level_ > 0) {
-        ReserveLevels(batch_size);
-
-        int16_t* def_levels = this->def_levels() + levels_written_;
-        int16_t* rep_levels = this->rep_levels() + levels_written_;
-
-        // Not present for non-repeated fields
-        int64_t levels_read = 0;
-        if (max_rep_level_ > 0) {
-          levels_read = ReadDefinitionLevels(batch_size, def_levels);
-          if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
-            throw ParquetException("Number of decoded rep / def levels did not match");
-          }
-        } else if (max_def_level_ > 0) {
-          levels_read = ReadDefinitionLevels(batch_size, def_levels);
-        }
-
-        // Exhausted column chunk
-        if (levels_read == 0) {
-          break;
-        }
-
-        levels_written_ += levels_read;
-        records_read += ReadRecordData(num_records - records_read);
-      } else {
-        // No repetition or definition levels
-        batch_size = std::min(num_records - records_read, batch_size);
-        records_read += ReadRecordData(batch_size);
-      }
+    std::cout << "rep levels: ";
+    for (int64_t i = 0; i < total_levels_read; ++i) {
+      std::cout << rep_levels[i] << " ";
     }
+    std::cout << std::endl;
 
-    return records_read;
+    std::cout << "values: ";
+    for (int64_t i = 0; i < this->values_written(); ++i) {
+      std::cout << values[i] << " ";
+    }
+    std::cout << std::endl;
   }
 
  private:
@@ -526,11 +557,21 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
   DecoderType* current_decoder_;
 
   // Advance to the next data page
-  bool ReadNewPage();
+  bool ReadNewPage() override;
 
   void ConfigureDictionary(const DictionaryPage* page);
 };
 
+// TODO(wesm): Implement these to some satisfaction
+template <>
+void TypedRecordReader<Int96Type>::DebugPrintState() {}
+
+template <>
+void TypedRecordReader<ByteArrayType>::DebugPrintState() {}
+
+template <>
+void TypedRecordReader<FLBAType>::DebugPrintState() {}
+
 template <>
 inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
   auto values = ValuesHead<ByteArray>();
@@ -822,5 +863,7 @@ void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
   impl_->SetPageReader(std::move(reader));
 }
 
+void RecordReader::DebugPrintState() { impl_->DebugPrintState(); }
+
 }  // namespace internal
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h
index 8da0709..7efd0d5 100644
--- a/cpp/src/parquet/arrow/record_reader.h
+++ b/cpp/src/parquet/arrow/record_reader.h
@@ -104,6 +104,8 @@ class RecordReader {
   /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
   void SetPageReader(std::unique_ptr<PageReader> reader);
 
+  void DebugPrintState();
+
  private:
   std::unique_ptr<RecordReaderImpl> impl_;
   explicit RecordReader(RecordReaderImpl* impl);
diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h
index d425cb0..097e369 100644
--- a/cpp/src/parquet/arrow/test-util.h
+++ b/cpp/src/parquet/arrow/test-util.h
@@ -484,44 +484,6 @@ void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
   EXPECT_TRUE(result->Equals(*expected_array));
 }
 
-template <typename ParquetType>
-void PrintBufferedLevels(const RecordReader& reader) {
-  using T = typename ::parquet::type_traits<ParquetType::type_num>::value_type;
-
-  const int16_t* def_levels = reader.def_levels();
-  const int16_t* rep_levels = reader.rep_levels();
-  const int64_t total_levels_read = reader.levels_position();
-
-  const T* values = reinterpret_cast<const T*>(reader.values());
-
-  std::cout << "def levels: ";
-  for (int64_t i = 0; i < total_levels_read; ++i) {
-    std::cout << def_levels[i] << " ";
-  }
-  std::cout << std::endl;
-
-  std::cout << "rep levels: ";
-  for (int64_t i = 0; i < total_levels_read; ++i) {
-    std::cout << rep_levels[i] << " ";
-  }
-  std::cout << std::endl;
-
-  std::cout << "values: ";
-  for (int64_t i = 0; i < reader.values_written(); ++i) {
-    std::cout << values[i] << " ";
-  }
-  std::cout << std::endl;
-}
-
-template <>
-void PrintBufferedLevels<ByteArrayType>(const RecordReader& reader) {}
-
-template <>
-void PrintBufferedLevels<FLBAType>(const RecordReader& reader) {}
-
-template <>
-void PrintBufferedLevels<Int96Type>(const RecordReader& reader) {}
-
 }  // namespace arrow
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/column_reader-test.cc b/cpp/src/parquet/column_reader-test.cc
index 273b302..60f2be2 100644
--- a/cpp/src/parquet/column_reader-test.cc
+++ b/cpp/src/parquet/column_reader-test.cc
@@ -386,5 +386,34 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
   pages_.clear();
 }
 
+TEST(TestColumnReader, DefinitionLevelsToBitmap) {
+  // Bugs in this function were exposed in ARROW-3930
+  std::vector<int16_t> def_levels = {3, 3, 3, 2, 3, 3, 3, 3, 3};
+  std::vector<int16_t> rep_levels = {0, 1, 1, 1, 1, 1, 1, 1, 1};
+
+  std::vector<uint8_t> valid_bits(2, 0);
+
+  const int max_def_level = 3;
+  const int max_rep_level = 1;
+
+  int64_t values_read = -1;
+  int64_t null_count = 0;
+  internal::DefinitionLevelsToBitmap(def_levels.data(), 9, max_def_level, max_rep_level,
+                                     &values_read, &null_count, valid_bits.data(),
+                                     0 /* valid_bits_offset */);
+  ASSERT_EQ(9, values_read);
+  ASSERT_EQ(1, null_count);
+
+  // Call again with 0 definition levels, make sure that valid_bits is unmodifed
+  const uint8_t current_byte = valid_bits[1];
+  null_count = 0;
+  internal::DefinitionLevelsToBitmap(def_levels.data(), 0, max_def_level, max_rep_level,
+                                     &values_read, &null_count, valid_bits.data(),
+                                     9 /* valid_bits_offset */);
+  ASSERT_EQ(0, values_read);
+  ASSERT_EQ(0, null_count);
+  ASSERT_EQ(current_byte, valid_bits[1]);
+}
+
 }  // namespace test
 }  // namespace parquet
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 960f210..42bf900 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -178,9 +178,11 @@ namespace internal {
 static inline void DefinitionLevelsToBitmap(
     const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
     const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
-    uint8_t* valid_bits, const int64_t valid_bits_offset) {
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // We assume here that valid_bits is large enough to accommodate the
+  // additional definition levels and the ones that have already been written
   ::arrow::internal::BitmapWriter valid_bits_writer(valid_bits, valid_bits_offset,
-                                                    num_def_levels);
+                                                    valid_bits_offset + num_def_levels);
 
   // TODO(itaiin): As an interim solution we are splitting the code path here
   // between repeated+flat column reads, and non-repeated+nested reads.