You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:05 UTC
[11/13] incubator-impala git commit: IMPALA-5197: Erroneous corrupted
Parquet file message
IMPALA-5197: Erroneous corrupted Parquet file message
The Parquet file column reader may fail in the middle
of producing a scratch tuple batch for various reasons
such as exceeding memory limit or cancellation. In which
case, the scratch tuple batch may not have materialized
all the rows in a row group. We shouldn't erroneously
report that the file is corrupted in this case as the
column reader didn't completely read the entire row group.
A new test case is added to verify that we won't see this
error message. A new failpoint phase GETNEXT_SCANNER is
also added to differentiate it from the GETNEXT in the
scan node itself.
Change-Id: I9138039ec60fbe9deff250b8772036e40e42e1f6
Reviewed-on: http://gerrit.cloudera.org:8080/6787
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/249632b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/249632b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/249632b3
Branch: refs/heads/master
Commit: 249632b3081e94b18361a43bd635586d6b1e0ed0
Parents: 368115c
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Apr 19 17:27:24 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 9 09:27:39 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 2 +-
be/src/exec/hdfs-scan-node-base.cc | 6 +++---
be/src/exec/hdfs-scan-node-base.h | 6 +++---
be/src/exec/hdfs-scanner.h | 4 +++-
be/src/exec/parquet-column-readers.cc | 27 ++++++++++++++++++---------
be/src/exec/parquet-column-readers.h | 11 +++++++----
common/thrift/PlanNodes.thrift | 1 +
tests/common/impala_service.py | 1 -
tests/failure/test_failpoints.py | 6 ++++--
9 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 046ec46..a6831b7 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -958,7 +958,7 @@ Status HdfsParquetScanner::AssembleRows(
scratch_batch_->num_tuples = 0;
DCHECK(scratch_batch_->AtEnd());
*skip_row_group = true;
- if (num_tuples_mismatch) {
+ if (num_tuples_mismatch && continue_execution) {
parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
"had $2 remaining values but expected $3", filename(),
col_reader->schema_element().name, last_num_tuples,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index e4ba44d..b6366a9 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -680,7 +680,7 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
partition->file_format()));
}
DCHECK(scanner->get() != NULL);
- Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_);
+ Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
if (status.ok()) {
status = scanner->get()->Open(context);
if (!status.ok()) {
@@ -904,6 +904,6 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
}
}
-Status HdfsScanNodeBase::TriggerDebugAction() {
- return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
+Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) {
+ return ExecDebugAction(phase, runtime_state_);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3453945..94e8952 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -494,9 +494,9 @@ class HdfsScanNodeBase : public ScanNode {
/// Scan nodes with a RowBatch queue may have to synchronize calls to this function.
void StopAndFinalizeCounters();
- /// Calls ExecDebugAction(). Returns the status based on the debug action specified
- /// for the query.
- Status TriggerDebugAction();
+ /// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the
+ /// debug action specified for the query.
+ Status ScanNodeDebugAction(TExecNodePhase::type phase);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 417ade7..24a3c4f 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -403,7 +403,9 @@ class HdfsScanner {
/// Triggers debug action of the scan node. This is currently used by parquet column
/// readers to exercise various failure paths in parquet scanner. Returns the status
/// returned by the scan node's TriggerDebugAction().
- Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); }
+ Status ScannerDebugAction() {
+ return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER);
+ }
/// Utility function to append an error message for an invalid row.
void LogRowParseError();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 53848e2..f1ac031 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -53,11 +53,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
// Also, this limit is in place to prevent impala from reading corrupt parquet files.
DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
-// Trigger debug action on every 128 tuples produced. This is useful in exercising the
-// failure or cancellation path.
+// Trigger debug action on every other call of Read*ValueBatch() once at least 128
+// tuples have been produced to simulate failure such as exceeding memory limit.
+// Triggering it every other call so as not to always fail on the first column reader
+// when materializing multiple columns. Failing on non-empty row batch tests proper
+// resources freeing by the Parquet scanner.
#ifndef NDEBUG
-#define DEBUG_ACTION_TRIGGER (127)
-#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0)
+static int debug_count = 0;
+#define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \
+ ((debug_count++ % 2) == 1 && num_tuples >= 128)
#else
#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
#endif
@@ -361,7 +365,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
val_count += ret_val_count;
num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= TriggerDebugAction();
+ continue_execution &= ColReaderDebugAction(&val_count);
}
}
*num_values = val_count;
@@ -765,12 +769,17 @@ class BoolColumnReader : public BaseScalarColumnReader {
BitReader bool_values_;
};
-bool ParquetColumnReader::TriggerDebugAction() {
- Status status = parent_->TriggerDebugAction();
+// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
+// path doesn't falsely report that the file is corrupted.
+bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
+#ifndef NDEBUG
+ Status status = parent_->ScannerDebugAction();
if (!status.ok()) {
if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
+ *val_count = 0;
return false;
}
+#endif
return true;
}
@@ -790,7 +799,7 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
continue_execution = ReadValue(pool, tuple);
++val_count;
if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= TriggerDebugAction();
+ continue_execution &= ColReaderDebugAction(&val_count);
}
}
*num_values = val_count;
@@ -806,7 +815,7 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
continue_execution = ReadNonRepeatedValue(pool, tuple);
++val_count;
if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= TriggerDebugAction();
+ continue_execution &= ColReaderDebugAction(&val_count);
}
}
*num_values = val_count;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 9de4277..66d8815 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -288,10 +288,13 @@ class ParquetColumnReader {
if (max_rep_level() == 0) rep_level_ = 0;
}
- /// Trigger debug action. Returns false if the debug action deems that the
- /// parquet column reader should halt execution. In which case, 'parse_status_'
- /// is also updated.
- bool TriggerDebugAction();
+ /// Called in the middle of creating a scratch tuple batch to simulate failures
+ /// such as exceeding memory limit or cancellation. Returns false if the debug
+ /// action deems that the parquet column reader should halt execution. 'val_count'
+ /// is the counter which the column reader uses to track the number of tuples
+ /// produced so far. If the column reader should halt execution, 'parse_status_'
+ /// is updated with the error status and 'val_count' is set to 0.
+ bool ColReaderDebugAction(int* val_count);
};
/// Reader for a single column from the parquet file. It's associated with a
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 1baffc0..e869afb 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -56,6 +56,7 @@ enum TExecNodePhase {
PREPARE_SCANNER,
OPEN,
GETNEXT,
+ GETNEXT_SCANNER,
CLOSE,
INVALID
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 265b1b6..3fb73bc 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -126,7 +126,6 @@ class ImpaladService(BaseImpalaService):
(num_in_flight_queries, expected_val))
return False
-
def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1):
start_time = time()
while (time() - start_time < timeout):
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 301762b..f67afe4 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -31,7 +31,7 @@ from tests.common.test_dimensions import create_exec_option_dimension
from tests.common.test_vector import ImpalaTestDimension
FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
# Map debug actions to their corresponding query options' values.
FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
@@ -39,7 +39,7 @@ MT_DOP_VALUES = [0, 4]
# Queries should cover all exec nodes.
QUERIES = [
- "select * from alltypessmall",
+ "select * from alltypes",
"select count(*) from alltypessmall",
"select count(int_col) from alltypessmall group by id",
"select 1 from alltypessmall a join alltypessmall b on a.id = b.id",
@@ -137,6 +137,8 @@ class TestFailpoints(ImpalaTestSuite):
assert 'Expected Failure'
except ImpalaBeeswaxException as e:
LOG.debug(e)
+ # IMPALA-5197: None of the debug actions should trigger corrupted file message
+ assert 'Corrupt Parquet file' not in str(e)
def __execute_cancel_action(self, query, vector):
LOG.info('Starting async query execution')