You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/27 05:19:03 UTC
[5/5] impala git commit: IMPALA-7662: fix error race when scanner
open fails
IMPALA-7662: fix error race when scanner open fails
This is very similar to IMPALA-7335, except happens
when 'progress_' is incremented in the call chain
HdfsScanNode::ProcessSplit
-> HdfsScanNodeBase::CreateAndOpenScanner()
-> HdfsScanner::Close()
The fix required restructuring the code so that
SetDoneInternal() is called with the error *before*
HdfsScanner::Close(). This required a refactoring because
HdfsScanNodeBase doesn't actually know about SetDoneInternal().
My fix is to put the common logic between HdfsScanNode and
HdfsScanNodeMt into a helper in HdfsScanNodeBase, then in
HdfsScanNode, make sure to call SetDoneInternal() before
closing the scanner.
I also reworked HdfsScanNode::ProcessSplit() to handle error propagation
internally. I think the joint responsibility between ProcessSplit() and
its caller for handling errors made things harder than necessary.
Testing:
Added a debug action and test that reproduced the race before the fix.
Change-Id: I45a61210ca7d057b048c77d9f2f2695ec450f19b
Reviewed-on: http://gerrit.cloudera.org:8080/11596
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/449fe73d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/449fe73d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/449fe73d
Branch: refs/heads/master
Commit: 449fe73d2145bd22f0f857623c3652a097f06d73
Parents: 2e5d658
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 5 10:13:36 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 27 04:07:51 2018 +0000
----------------------------------------------------------------------
be/src/exec/exec-node.cc | 7 ++--
be/src/exec/hdfs-scan-node-base.cc | 22 ++++--------
be/src/exec/hdfs-scan-node-base.h | 11 ++++--
be/src/exec/hdfs-scan-node-mt.cc | 13 ++++++-
be/src/exec/hdfs-scan-node-mt.h | 6 ++++
be/src/exec/hdfs-scan-node.cc | 36 ++++++++++++--------
be/src/exec/hdfs-scan-node.h | 12 +++++--
be/src/exec/hdfs-scanner.h | 2 ++
common/thrift/PlanNodes.thrift | 5 +++
.../parquet-error-propagation-race.test | 13 +++++++
tests/failure/test_failpoints.py | 5 ++-
tests/query_test/test_scanners.py | 15 ++++++++
12 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index b9406dd..3a606f5 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -392,8 +392,7 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
return Status::OK();
} else if (debug_options_.action == TDebugAction::MEM_LIMIT_EXCEEDED) {
return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
- } else {
- DCHECK_EQ(debug_options_.action, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+ } else if (debug_options_.action == TDebugAction::SET_DENY_RESERVATION_PROBABILITY) {
// We can only enable the debug action if the buffer pool client is registered.
// If the buffer client is not registered at this point (e.g. if phase is PREPARE or
// OPEN), then we will enable the debug action at the time when the client is
@@ -401,6 +400,10 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
if (reservation_manager_.buffer_pool_client()->is_registered()) {
RETURN_IF_ERROR(reservation_manager_.EnableDenyReservationDebugAction());
}
+ } else {
+ DCHECK_EQ(debug_options_.action, TDebugAction::DELAY);
+ VLOG(1) << "DEBUG_ACTION: Sleeping";
+ SleepForMs(100);
}
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b11dcdc..4a39124 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -624,9 +624,10 @@ void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
return it->second;
}
-Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
- DCHECK(context != NULL);
+ DCHECK(context != nullptr);
+ DCHECK(scanner->get() == nullptr);
THdfsCompression::type compression =
context->GetStream()->file_desc()->file_compression;
@@ -663,19 +664,10 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
return Status(Substitute("Unknown Hdfs file format type: $0",
partition->file_format()));
}
- DCHECK(scanner->get() != NULL);
- Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
- if (status.ok()) {
- status = scanner->get()->Open(context);
- if (!status.ok()) {
- scanner->get()->Close(nullptr);
- scanner->reset();
- }
- } else {
- context->ClearStreams();
- scanner->reset();
- }
- return status;
+ DCHECK(scanner->get() != nullptr);
+ RETURN_IF_ERROR(scanner->get()->Open(context));
+ // Inject the error after the scanner is opened, to test the scanner close path.
+ return ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
}
Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals,
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index b63d9c5..0a5a328 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -556,9 +556,14 @@ class HdfsScanNodeBase : public ScanNode {
/// buffers.
Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
- /// Create and open new scanner for this partition type.
- /// If the scanner is successfully created and opened, it is returned in 'scanner'.
- Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+ /// Helper for the CreateAndOpenScanner() implementations in the subclass. Creates and
+ /// opens a new scanner for this partition type. Depending on the outcome, the
+ /// behaviour differs:
+ /// - If the scanner is successfully created and opened, returns OK and sets *scanner.
+ /// - If the scanner cannot be created, returns an error and does not set *scanner.
+ /// - If the scanner is created but opening fails, returns an error and sets *scanner.
+ /// The caller is then responsible for closing the scanner.
+ Status CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
WARN_UNUSED_RESULT;
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index f3a2253..4e59e0a 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -27,8 +27,9 @@
#include "gen-cpp/PlanNodes_types.h"
+#include "common/names.h"
+
using namespace impala::io;
-using std::stringstream;
namespace impala {
@@ -126,6 +127,16 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
return Status::OK();
}
+Status HdfsScanNodeMt::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+ ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
+ Status status = CreateAndOpenScannerHelper(partition, context, scanner);
+ if (!status.ok() && scanner->get() != nullptr) {
+ scanner->get()->Close(nullptr);
+ scanner->reset();
+ }
+ return status;
+}
+
void HdfsScanNodeMt::Close(RuntimeState* state) {
if (is_closed()) return;
if (scanner_.get() != nullptr) scanner_->Close(nullptr);
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 78d7718..22c3d46 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -49,6 +49,12 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
virtual bool HasRowBatchQueue() const override { return false; }
private:
+ /// Create and open new scanner for this partition type.
+ /// If the scanner is successfully created and opened, it is returned in 'scanner'.
+ Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+ ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
+ WARN_UNUSED_RESULT;
+
/// Current scan range and corresponding scanner.
io::ScanRange* scan_range_;
boost::scoped_ptr<ScannerContext> scanner_ctx_;
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 6c01bf4..04b6589 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -338,6 +338,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
// Abort the query. This is still holding the lock_, so done_ is known to be
// false and status_ must be ok.
+ discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
SetDoneInternal(status);
break;
}
@@ -385,12 +386,6 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
int num_unqueued_files = num_unqueued_files_.Load();
ScanRange* scan_range;
Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
- if (status.ok() && scan_range != nullptr) {
- // Got a scan range. Process the range end to end (in this thread).
- status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
- &expr_results_pool, scan_range, &scanner_thread_reservation);
- }
-
if (!status.ok()) {
unique_lock<mutex> l(lock_);
// If there was already an error, the main thread will do the cleanup
@@ -401,6 +396,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
SetDoneInternal(status);
break;
}
+ if (scan_range != nullptr) {
+ // Got a scan range. Process the range end to end (in this thread).
+ ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+ &expr_results_pool, scan_range, &scanner_thread_reservation);
+ }
// Done with range and it completed successfully
if (progress_.done()) {
@@ -444,7 +444,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
thread_state_.DecrementNumActive();
}
-Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
+void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool, ScanRange* scan_range,
int64_t* scanner_thread_reservation) {
DCHECK(scan_range != nullptr);
@@ -467,14 +467,14 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
HdfsScanNodeBase::RangeComplete(partition->file_format(), desc->file_compression,
true);
}
- return Status::OK();
+ return;
}
ScannerContext context(runtime_state_, this, buffer_pool_client(),
*scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
context.AddStream(scan_range, *scanner_thread_reservation);
scoped_ptr<HdfsScanner> scanner;
- Status status = CreateAndOpenScanner(partition, &context, &scanner);
+ Status status = CreateAndOpenScannerHelper(partition, &context, &scanner);
if (!status.ok()) {
// If preparation fails, avoid leaking unread buffers in the scan_range.
scan_range->Cancel(status);
@@ -486,7 +486,12 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
ss << endl << runtime_state_->ErrorLog();
VLOG_QUERY << ss.str();
}
- return status;
+
+ // Ensure that the error is propagated before marking a range as complete (The
+ // scanner->Close() call marks a scan range as complete).
+ SetError(status);
+ if (scanner != nullptr) scanner->Close();
+ return;
}
status = scanner->ProcessSplit();
@@ -511,9 +516,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
// HdfsScanNodeBase::status_ variable and notify other scanner threads. Ensure that
// status_ is updated before marking a range as complete (The scanner->Close() call
// marks a scan range as complete).
- unique_lock<mutex> l(lock_);
- // Update the status_ and set the done_ flag if this is the first non-ok status.
- SetDoneInternal(status);
+ SetError(status);
}
// Transfer remaining resources to a final batch and add it to the row batch queue and
// decrement progress_ to indicate that the scan range is complete.
@@ -521,7 +524,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
// Reservation may have been increased by the scanner, e.g. Parquet may allocate
// additional reservation to scan columns.
*scanner_thread_reservation = context.total_reservation();
- return status;
}
void HdfsScanNode::SetDoneInternal(const Status& status) {
@@ -538,3 +540,9 @@ void HdfsScanNode::SetDone() {
unique_lock<mutex> l(lock_);
SetDoneInternal(status_);
}
+
+void HdfsScanNode::SetError(const Status& status) {
+ discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
+ unique_lock<mutex> l(lock_);
+ SetDoneInternal(status);
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 0e0f67c..853da31 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -162,10 +162,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
/// in this split. 'scanner_thread_reservation' is an in/out argument that tracks the
/// total reservation from 'buffer_pool_client_' that is allotted for this thread's
- /// use.
- Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
+ /// use. If an error is encountered, calls SetDoneInternal() with the error to
+ /// initiate shutdown of the scan.
+ void ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool, io::ScanRange* scan_range,
- int64_t* scanner_thread_reservation) WARN_UNUSED_RESULT;
+ int64_t* scanner_thread_reservation);
/// Called by scanner thread to return some or all of its reservation that is not
/// needed. Always holds onto at least the minimum reservation to avoid violating the
@@ -185,6 +186,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// Gets lock_ and calls SetDoneInternal(status_). Usually used after the scan node
/// completes execution successfully.
void SetDone();
+
+ /// Gets lock_ and calls SetDoneInternal(status). Called after a scanner hits an
+ /// error. Must be called before HdfsScanner::Close() to ensure that 'status'
+ /// is propagated before the scan range is marked as complete by HdfsScanner::Close().
+ void SetError(const Status& status);
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index d642313..9cbc99b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -148,6 +148,8 @@ class HdfsScanner {
/// and memory in mem pools to the given row batch. If the row batch is NULL,
/// those resources are released instead. In any case, releases all other resources
/// that are not backing returned rows (e.g. temporary decompression buffers).
+ /// Also marks any associated scan ranges as complete by calling RangeComplete() on the
+ /// scan node.
/// This function is not idempotent and must only be called once.
virtual void Close(RowBatch* row_batch) = 0;
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 5d245fb..331e432 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -60,6 +60,9 @@ enum TExecNodePhase {
GETNEXT,
GETNEXT_SCANNER,
CLOSE,
+ // After a scanner thread completes a range with an error but before it propagates the
+ // error.
+ SCANNER_ERROR,
INVALID
}
@@ -72,6 +75,8 @@ enum TDebugAction {
// A floating point number in range [0.0, 1.0] that gives the probability of denying
// each reservation increase request after the initial reservation.
SET_DENY_RESERVATION_PROBABILITY,
+ // Delay for a short amount of time: 100ms
+ DELAY,
}
// Preference for replica selection
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
new file mode 100644
index 0000000..4104595
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
@@ -0,0 +1,13 @@
+====
+---- QUERY
+# Add a valid file with a single row to the table.
+INSERT INTO bad_magic_number SELECT 'good';
+---- RESULTS
+: 1
+====
+---- QUERY
+set debug_action="0:SCANNER_ERROR:DELAY";
+SELECT * FROM bad_magic_number
+---- CATCH
+invalid version number
+====
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index d6cc1da..4e879a7 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -32,7 +32,10 @@ from tests.common.test_dimensions import create_exec_option_dimension
from tests.common.test_vector import ImpalaTestDimension
FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
+# Not included:
+# - SCANNER_ERROR, because it only fires if the query already hit an error.
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER',
+ 'CLOSE']
# Map debug actions to their corresponding query options' values.
FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index b75c3cc..e8a24b8 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -722,6 +722,21 @@ class TestParquet(ImpalaTestSuite):
self.run_test_case("QueryTest/parquet-type-widening", vector, unique_database)
+ def test_error_propagation_race(self, vector, unique_database):
+ """IMPALA-7662: failed scan signals completion before error is propagated. To
+ reproduce, we construct a table with two Parquet files, one valid and another
+ invalid. The scanner thread for the invalid file must propagate the error
+ before we mark the whole scan complete."""
+ if vector.get_value('exec_option')['debug_action'] is not None:
+ pytest.skip(".test file needs to override debug action")
+ create_table_and_copy_files(self.client,
+ "CREATE TABLE {db}.{tbl} (s STRING) STORED AS PARQUET",
+ unique_database, "bad_magic_number", ["testdata/data/bad_magic_number.parquet"])
+ # We need the ranges to all be scheduled on the same impalad.
+ vector.get_value('exec_option')['num_nodes'] = 1
+ self.run_test_case("QueryTest/parquet-error-propagation-race", vector,
+ unique_database)
+
# We use various scan range lengths to exercise corner cases in the HDFS scanner more
# thoroughly. In particular, it will exercise:
# 1. default scan range