You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/20 20:44:07 UTC

[09/21] impala git commit: IMPALA-6589: remove invalid DCHECK in parquet reader

IMPALA-6589: remove invalid DCHECK in parquet reader

The DCHECK was only valid if the Parquet file metadata is internally
consistent, with the number of values reported by the metadata
matching the number of encoded levels.

The DCHECK was intended to directly detect misuse of the RleBatchDecoder
interface, which would lead to incorrect results. However, our other
test coverage for reading Parquet files is sufficient to test the
correctness of level decoding.

Testing:
Added a minimal corrupt test file that reproduces the issue.

Change-Id: Idd6e09f8c8cca8991be5b5b379f6420adaa97daa
Reviewed-on: http://gerrit.cloudera.org:8080/9556
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8a861541
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8a861541
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8a861541

Branch: refs/heads/2.x
Commit: 8a861541d3528ba579e9853637dbafe9efe58c07
Parents: 614d0ff
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Mar 8 09:32:45 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sun Mar 18 21:03:22 2018 +0000

----------------------------------------------------------------------
 be/src/exec/parquet-column-readers.cc           |  25 ++++++++++++++++---
 be/src/exec/parquet-column-readers.h            |   6 +++++
 testdata/data/README                            |   6 +++++
 .../data/num_values_def_levels_mismatch.parquet | Bin 0 -> 252 bytes
 .../parquet-num-values-def-levels-mismatch.test |   6 +++++
 tests/query_test/test_scanners.py               |  11 ++++++++
 6 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 2939691..09842c5 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -180,7 +180,7 @@ bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
   DCHECK(!CacheHasNext());
   DCHECK(num_cached_levels != nullptr);
   DCHECK_GE(max_level_, 0);
-  DCHECK_EQ(num_cached_levels_ % 32, 0) << "Last batch was not a multiple of 32";
+  DCHECK_GE(*num_cached_levels, 0);
   cached_level_idx_ = 0;
   if (max_level_ == 0) {
     // No levels to read, e.g., because the field is required. The cache was
@@ -593,6 +593,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
         slot_desc_->type().DebugString(), stream_->file_offset());
   }
+
   void __attribute__((noinline)) SetPlainDecodeError() {
     parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
         slot_desc_->type().DebugString(), stream_->file_offset());
@@ -1202,15 +1203,17 @@ bool BaseScalarColumnReader::NextLevels() {
   // The compiler can optimize these two conditions into a single branch by treating
   // def_level_ as unsigned.
   if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
-    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
-        "invalid def level $1 > max def level $2 for column '$3'", filename(),
-        def_level_, max_def_level(), schema_element().name)));
+    SetLevelDecodeError("def", def_level_, max_def_level());
     return false;
   }
 
   if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
     // Repetition level is only present if this column is nested in any collection type.
     rep_level_ = rep_levels_.ReadLevel();
+    if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) {
+      SetLevelDecodeError("rep", rep_level_, max_rep_level());
+      return false;
+    }
     // Reset position counter if we are at the start of a new parent collection.
     if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
   }
@@ -1232,6 +1235,20 @@ bool BaseScalarColumnReader::NextPage() {
   return true;
 }
 
+void BaseScalarColumnReader::SetLevelDecodeError(const char* level_name,
+    int decoded_level, int max_level) {
+  if (decoded_level < 0) {
+    DCHECK_EQ(decoded_level, HdfsParquetScanner::INVALID_LEVEL);
+    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
+        "could not read all $1 levels for column '$2'", filename(),
+        level_name, schema_element().name)));
+  } else {
+    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
+        "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(),
+        level_name, decoded_level, max_level, schema_element().name)));
+  }
+}
+
 bool CollectionColumnReader::NextLevels() {
   DCHECK(!children_.empty());
   DCHECK_LE(rep_level_, new_collection_rep_level());

http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 5186813..6b92ccb 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -506,6 +506,12 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
         && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
   }
+
+  /// Slow-path status construction code for def/rep decoding errors. 'level_name' is
+  /// either "rep" or "def", 'decoded_level' is the value returned from
+  /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed value.
+  void __attribute__((noinline)) SetLevelDecodeError(const char* level_name,
+      int decoded_level, int max_level);
 };
 
 /// Collections are not materialized directly in parquet files; only scalar values appear

http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index f02e70b..928d435 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -162,3 +162,9 @@ compression codec. The data in the file is the whole of the "alltypestiny" data
 the same columns: id int, bool_col boolean, tinyint_col tinyint, smallint_col smallint,
 int_col int, bigint_col bigint, float_col float, double_col double,
 date_string_col string, string_col string, timestamp_col timestamp, year int, month int
+
+num_values_def_levels_mismatch.parquet:
+A file with a single boolean column with page metadata reporting 2 values but only def
+levels for a single literal value. Generated by hacking Impala's parquet writer to
+increment page.header.data_page_header.num_values. This caused Impala to hit a DCHECK
+(IMPALA-6589).

http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/testdata/data/num_values_def_levels_mismatch.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/num_values_def_levels_mismatch.parquet b/testdata/data/num_values_def_levels_mismatch.parquet
new file mode 100644
index 0000000..92f6e60
Binary files /dev/null and b/testdata/data/num_values_def_levels_mismatch.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
new file mode 100644
index 0000000..47fbc1a
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-num-values-def-levels-mismatch.test
@@ -0,0 +1,6 @@
+====
+---- QUERY
+select * from num_values_def_levels_mismatch
+---- CATCH
+could not read all def levels for column '_c0'
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/8a861541/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index a0c36c7..baa031e 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -430,6 +430,17 @@ class TestParquet(ImpalaTestSuite):
         "/testdata/data/bad_codec.parquet", tbl_loc])
     self.run_test_case('QueryTest/parquet-bad-codec', vector, unique_database)
 
+  def test_num_values_def_levels_mismatch(self, vector, unique_database):
+    """IMPALA-6589: test the bad num_values handled correctly. """
+    self.client.execute(("""CREATE TABLE {0}.num_values_def_levels_mismatch (_c0 BOOLEAN)
+        STORED AS PARQUET""").format(unique_database))
+    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
+        "num_values_def_levels_mismatch"))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+        "/testdata/data/num_values_def_levels_mismatch.parquet", tbl_loc])
+    self.run_test_case('QueryTest/parquet-num-values-def-levels-mismatch',
+        vector, unique_database)
+
   @SkipIfS3.hdfs_block_size
   @SkipIfADLS.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size