You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:05 UTC

[11/13] incubator-impala git commit: IMPALA-5197: Erroneous corrupted Parquet file message

IMPALA-5197: Erroneous corrupted Parquet file message

The Parquet file column reader may fail in the middle
of producing a scratch tuple batch for various reasons
such as exceeding memory limit or cancellation. In which
case, the scratch tuple batch may not have materialized
all the rows in a row group. We shouldn't erroneously
report that the file is corrupted in this case as the
column reader didn't completely read the entire row group.

A new test case is added to verify that we won't see this
error message. A new failpoint phase GETNEXT_SCANNER is
also added to differentiate it from the GETNEXT in the
scan node itself.

Change-Id: I9138039ec60fbe9deff250b8772036e40e42e1f6
Reviewed-on: http://gerrit.cloudera.org:8080/6787
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/249632b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/249632b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/249632b3

Branch: refs/heads/master
Commit: 249632b3081e94b18361a43bd635586d6b1e0ed0
Parents: 368115c
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 19 17:27:24 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 9 09:27:39 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   |  2 +-
 be/src/exec/hdfs-scan-node-base.cc    |  6 +++---
 be/src/exec/hdfs-scan-node-base.h     |  6 +++---
 be/src/exec/hdfs-scanner.h            |  4 +++-
 be/src/exec/parquet-column-readers.cc | 27 ++++++++++++++++++---------
 be/src/exec/parquet-column-readers.h  | 11 +++++++----
 common/thrift/PlanNodes.thrift        |  1 +
 tests/common/impala_service.py        |  1 -
 tests/failure/test_failpoints.py      |  6 ++++--
 9 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 046ec46..a6831b7 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -958,7 +958,7 @@ Status HdfsParquetScanner::AssembleRows(
         scratch_batch_->num_tuples = 0;
         DCHECK(scratch_batch_->AtEnd());
         *skip_row_group = true;
-        if (num_tuples_mismatch) {
+        if (num_tuples_mismatch && continue_execution) {
           parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
               "had $2 remaining values but expected $3", filename(),
               col_reader->schema_element().name, last_num_tuples,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index e4ba44d..b6366a9 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -680,7 +680,7 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
           partition->file_format()));
   }
   DCHECK(scanner->get() != NULL);
-  Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
+  Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
   if (status.ok()) {
     status = scanner->get()->Open(context);
     if (!status.ok()) {
@@ -904,6 +904,6 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
   }
 }
 
-Status HdfsScanNodeBase::TriggerDebugAction() {
-  return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
+Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) {
+  return ExecDebugAction(phase, runtime_state_);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3453945..94e8952 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -494,9 +494,9 @@ class HdfsScanNodeBase : public ScanNode {
   /// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
   void StopAndFinalizeCounters();
 
-  /// Calls ExecDebugAction(). Returns the status based on the debug action specified
-  /// for the query.
-  Status TriggerDebugAction();
+  /// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the
+  /// debug action specified for the query.
+  Status ScanNodeDebugAction(TExecNodePhase::type phase);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 417ade7..24a3c4f 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -403,7 +403,9 @@ class HdfsScanner {
   /// Triggers debug action of the scan node. This is currently used by parquet column
   /// readers to exercise various failure paths in parquet scanner. Returns the status
   /// returned by the scan node's TriggerDebugAction().
-  Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); }
+  Status ScannerDebugAction() {
+    return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER);
+  }
 
   /// Utility function to append an error message for an invalid row.
   void LogRowParseError();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 53848e2..f1ac031 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -53,11 +53,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
 // Also, this limit is in place to prevent impala from reading corrupt parquet files.
 DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
 
-// Trigger debug action on every 128 tuples produced. This is useful in exercising the
-// failure or cancellation path.
+// Trigger debug action on every other call of Read*ValueBatch() once at least 128
+// tuples have been produced to simulate failure such as exceeding memory limit.
+// Triggering it every other call so as not to always fail on the first column reader
+// when materializing multiple columns. Failing on non-empty row batch tests proper
+// resources freeing by the Parquet scanner.
 #ifndef NDEBUG
-#define DEBUG_ACTION_TRIGGER (127)
-#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0)
+static int debug_count = 0;
+#define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \
+    ((debug_count++ % 2) == 1 && num_tuples >= 128)
 #else
 #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
 #endif
@@ -361,7 +365,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       val_count += ret_val_count;
       num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
       if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-        continue_execution &= TriggerDebugAction();
+        continue_execution &= ColReaderDebugAction(&val_count);
       }
     }
     *num_values = val_count;
@@ -765,12 +769,17 @@ class BoolColumnReader : public BaseScalarColumnReader {
   BitReader bool_values_;
 };
 
-bool ParquetColumnReader::TriggerDebugAction() {
-  Status status = parent_->TriggerDebugAction();
+// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
+// path doesn't falsely report that the file is corrupted.
+bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
+#ifndef NDEBUG
+  Status status = parent_->ScannerDebugAction();
   if (!status.ok()) {
     if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
+    *val_count = 0;
     return false;
   }
+#endif
   return true;
 }
 
@@ -790,7 +799,7 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-      continue_execution &= TriggerDebugAction();
+      continue_execution &= ColReaderDebugAction(&val_count);
     }
   }
   *num_values = val_count;
@@ -806,7 +815,7 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
     continue_execution = ReadNonRepeatedValue(pool, tuple);
     ++val_count;
     if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
-      continue_execution &= TriggerDebugAction();
+      continue_execution &= ColReaderDebugAction(&val_count);
     }
   }
   *num_values = val_count;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 9de4277..66d8815 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -288,10 +288,13 @@ class ParquetColumnReader {
     if (max_rep_level() == 0) rep_level_ = 0;
   }
 
-  /// Trigger debug action. Returns false if the debug action deems that the
-  /// parquet column reader should halt execution. In which case, 'parse_status_'
-  /// is also updated.
-  bool TriggerDebugAction();
+  /// Called in the middle of creating a scratch tuple batch to simulate failures
+  /// such as exceeding memory limit or cancellation. Returns false if the debug
+  /// action deems that the parquet column reader should halt execution. 'val_count'
+  /// is the counter which the column reader uses to track the number of tuples
+  /// produced so far. If the column reader should halt execution, 'parse_status_'
+  /// is updated with the error status and 'val_count' is set to 0.
+  bool ColReaderDebugAction(int* val_count);
 };
 
 /// Reader for a single column from the parquet file.  It's associated with a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 1baffc0..e869afb 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -56,6 +56,7 @@ enum TExecNodePhase {
   PREPARE_SCANNER,
   OPEN,
   GETNEXT,
+  GETNEXT_SCANNER,
   CLOSE,
   INVALID
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 265b1b6..3fb73bc 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -126,7 +126,6 @@ class ImpaladService(BaseImpalaService):
         (num_in_flight_queries, expected_val))
     return False
 
-
   def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1):
     start_time = time()
     while (time() - start_time < timeout):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 301762b..f67afe4 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -31,7 +31,7 @@ from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 
 FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
 # Map debug actions to their corresponding query options' values.
 FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
                         'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
@@ -39,7 +39,7 @@ MT_DOP_VALUES = [0, 4]
 
 # Queries should cover all exec nodes.
 QUERIES = [
-  "select * from alltypessmall",
+  "select * from alltypes",
   "select count(*) from alltypessmall",
   "select count(int_col) from alltypessmall group by id",
   "select 1 from alltypessmall a join alltypessmall b on a.id = b.id",
@@ -137,6 +137,8 @@ class TestFailpoints(ImpalaTestSuite):
       assert 'Expected Failure'
     except ImpalaBeeswaxException as e:
       LOG.debug(e)
+      # IMPALA-5197: None of the debug actions should trigger corrupted file message
+      assert 'Corrupt Parquet file' not in str(e)
 
   def __execute_cancel_action(self, query, vector):
     LOG.info('Starting async query execution')