You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/27 05:19:03 UTC

[5/5] impala git commit: IMPALA-7662: fix error race when scanner open fails

IMPALA-7662: fix error race when scanner open fails

This is very similar to IMPALA-7335, except happens
when 'progress_' is incremented in the call chain
HdfsScanNode::ProcessSplit
-> HdfsScanNodeBase::CreateAndOpenScanner()
-> HdfsScanner::Close()

The fix required restructuring the code so that
SetDoneInternal() is called with the error *before*
HdfsScanner::Close(). This required a refactoring because
HdfsScanNodeBase doesn't actually know about SetDoneInternal().

My fix is to put the common logic between HdfsScanNode and
HdfsScanNodeMt into a helper in HdfsScanNodeBase, then in
HdfsScanNode, make sure to call SetDoneInternal() before
closing the scanner.

I also reworked HdfsScanNode::ProcessSplit() to handle error propagation
internally. I think the joint responsibility between ProcessSplit() and
its caller for handling errors made things harder than necessary.

Testing:
Added a debug action and test that reproduced the race before the fix.

Change-Id: I45a61210ca7d057b048c77d9f2f2695ec450f19b
Reviewed-on: http://gerrit.cloudera.org:8080/11596
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/449fe73d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/449fe73d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/449fe73d

Branch: refs/heads/master
Commit: 449fe73d2145bd22f0f857623c3652a097f06d73
Parents: 2e5d658
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 5 10:13:36 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 27 04:07:51 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        |  7 ++--
 be/src/exec/hdfs-scan-node-base.cc              | 22 ++++--------
 be/src/exec/hdfs-scan-node-base.h               | 11 ++++--
 be/src/exec/hdfs-scan-node-mt.cc                | 13 ++++++-
 be/src/exec/hdfs-scan-node-mt.h                 |  6 ++++
 be/src/exec/hdfs-scan-node.cc                   | 36 ++++++++++++--------
 be/src/exec/hdfs-scan-node.h                    | 12 +++++--
 be/src/exec/hdfs-scanner.h                      |  2 ++
 common/thrift/PlanNodes.thrift                  |  5 +++
 .../parquet-error-propagation-race.test         | 13 +++++++
 tests/failure/test_failpoints.py                |  5 ++-
 tests/query_test/test_scanners.py               | 15 ++++++++
 12 files changed, 108 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index b9406dd..3a606f5 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -392,8 +392,7 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
     return Status::OK();
   } else if (debug_options_.action == TDebugAction::MEM_LIMIT_EXCEEDED) {
     return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
-  } else {
-    DCHECK_EQ(debug_options_.action, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+  } else if (debug_options_.action == TDebugAction::SET_DENY_RESERVATION_PROBABILITY) {
     // We can only enable the debug action if the buffer pool client is registered.
     // If the buffer client is not registered at this point (e.g. if phase is PREPARE or
     // OPEN), then we will enable the debug action at the time when the client is
@@ -401,6 +400,10 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
     if (reservation_manager_.buffer_pool_client()->is_registered()) {
       RETURN_IF_ERROR(reservation_manager_.EnableDenyReservationDebugAction());
     }
+  } else {
+    DCHECK_EQ(debug_options_.action, TDebugAction::DELAY);
+    VLOG(1) << "DEBUG_ACTION: Sleeping";
+    SleepForMs(100);
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b11dcdc..4a39124 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -624,9 +624,10 @@ void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) {
   return it->second;
 }
 
-Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+Status HdfsScanNodeBase::CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
     ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
-  DCHECK(context != NULL);
+  DCHECK(context != nullptr);
+  DCHECK(scanner->get() == nullptr);
   THdfsCompression::type compression =
       context->GetStream()->file_desc()->file_compression;
 
@@ -663,19 +664,10 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
       return Status(Substitute("Unknown Hdfs file format type: $0",
           partition->file_format()));
   }
-  DCHECK(scanner->get() != NULL);
-  Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
-  if (status.ok()) {
-    status = scanner->get()->Open(context);
-    if (!status.ok()) {
-      scanner->get()->Close(nullptr);
-      scanner->reset();
-    }
-  } else {
-    context->ClearStreams();
-    scanner->reset();
-  }
-  return status;
+  DCHECK(scanner->get() != nullptr);
+  RETURN_IF_ERROR(scanner->get()->Open(context));
+  // Inject the error after the scanner is opened, to test the scanner close path.
+  return ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER);
 }
 
 Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals,

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index b63d9c5..0a5a328 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -556,9 +556,14 @@ class HdfsScanNodeBase : public ScanNode {
   /// buffers.
   Status StartNextScanRange(int64_t* reservation, io::ScanRange** scan_range);
 
-  /// Create and open new scanner for this partition type.
-  /// If the scanner is successfully created and opened, it is returned in 'scanner'.
-  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+  /// Helper for the CreateAndOpenScanner() implementations in the subclass. Creates and
+  /// opens a new scanner for this partition type. Depending on the outcome, the
+  /// behaviour differs:
+  /// - If the scanner is successfully created and opened, returns OK and sets *scanner.
+  /// - If the scanner cannot be created, returns an error and does not set *scanner.
+  /// - If the scanner is created but opening fails, returns an error and sets *scanner.
+  ///   The caller is then responsible for closing the scanner.
+  Status CreateAndOpenScannerHelper(HdfsPartitionDescriptor* partition,
       ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
       WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index f3a2253..4e59e0a 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -27,8 +27,9 @@
 
 #include "gen-cpp/PlanNodes_types.h"
 
+#include "common/names.h"
+
 using namespace impala::io;
-using std::stringstream;
 
 namespace impala {
 
@@ -126,6 +127,16 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   return Status::OK();
 }
 
+Status HdfsScanNodeMt::CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+    ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) {
+  Status status = CreateAndOpenScannerHelper(partition, context, scanner);
+  if (!status.ok() && scanner->get() != nullptr) {
+    scanner->get()->Close(nullptr);
+    scanner->reset();
+  }
+  return status;
+}
+
 void HdfsScanNodeMt::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (scanner_.get() != nullptr) scanner_->Close(nullptr);

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 78d7718..22c3d46 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -49,6 +49,12 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   virtual bool HasRowBatchQueue() const override { return false; }
 
  private:
+  /// Create and open new scanner for this partition type.
+  /// If the scanner is successfully created and opened, it is returned in 'scanner'.
+  Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
+      ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner)
+      WARN_UNUSED_RESULT;
+
   /// Current scan range and corresponding scanner.
   io::ScanRange* scan_range_;
   boost::scoped_ptr<ScannerContext> scanner_ctx_;

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 6c01bf4..04b6589 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -338,6 +338,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
       // Abort the query. This is still holding the lock_, so done_ is known to be
       // false and status_ must be ok.
+      discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
       SetDoneInternal(status);
       break;
     }
@@ -385,12 +386,6 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
     int num_unqueued_files = num_unqueued_files_.Load();
     ScanRange* scan_range;
     Status status = StartNextScanRange(&scanner_thread_reservation, &scan_range);
-    if (status.ok() && scan_range != nullptr) {
-      // Got a scan range. Process the range end to end (in this thread).
-      status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
-          &expr_results_pool, scan_range, &scanner_thread_reservation);
-    }
-
     if (!status.ok()) {
       unique_lock<mutex> l(lock_);
       // If there was already an error, the main thread will do the cleanup
@@ -401,6 +396,11 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
       SetDoneInternal(status);
       break;
     }
+    if (scan_range != nullptr) {
+      // Got a scan range. Process the range end to end (in this thread).
+      ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+          &expr_results_pool, scan_range, &scanner_thread_reservation);
+    }
 
     // Done with range and it completed successfully
     if (progress_.done()) {
@@ -444,7 +444,7 @@ void HdfsScanNode::ScannerThread(bool first_thread, int64_t scanner_thread_reser
   thread_state_.DecrementNumActive();
 }
 
-Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
+void HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     MemPool* expr_results_pool, ScanRange* scan_range,
     int64_t* scanner_thread_reservation) {
   DCHECK(scan_range != nullptr);
@@ -467,14 +467,14 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
       HdfsScanNodeBase::RangeComplete(partition->file_format(), desc->file_compression,
           true);
     }
-    return Status::OK();
+    return;
   }
 
   ScannerContext context(runtime_state_, this, buffer_pool_client(),
       *scanner_thread_reservation, partition, filter_ctxs, expr_results_pool);
   context.AddStream(scan_range, *scanner_thread_reservation);
   scoped_ptr<HdfsScanner> scanner;
-  Status status = CreateAndOpenScanner(partition, &context, &scanner);
+  Status status = CreateAndOpenScannerHelper(partition, &context, &scanner);
   if (!status.ok()) {
     // If preparation fails, avoid leaking unread buffers in the scan_range.
     scan_range->Cancel(status);
@@ -486,7 +486,12 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
       ss << endl << runtime_state_->ErrorLog();
       VLOG_QUERY << ss.str();
     }
-    return status;
+
+    // Ensure that the error is propagated before marking a range as complete (The
+    // scanner->Close() call marks a scan range as complete).
+    SetError(status);
+    if (scanner != nullptr) scanner->Close();
+    return;
   }
 
   status = scanner->ProcessSplit();
@@ -511,9 +516,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     // HdfsScanNodeBase::status_ variable and notify other scanner threads. Ensure that
     // status_ is updated before marking a range as complete (The scanner->Close() call
     // marks a scan range as complete).
-    unique_lock<mutex> l(lock_);
-    // Update the status_ and set the done_ flag if this is the first non-ok status.
-    SetDoneInternal(status);
+    SetError(status);
   }
   // Transfer remaining resources to a final batch and add it to the row batch queue and
   // decrement progress_ to indicate that the scan range is complete.
@@ -521,7 +524,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   // Reservation may have been increased by the scanner, e.g. Parquet may allocate
   // additional reservation to scan columns.
   *scanner_thread_reservation = context.total_reservation();
-  return status;
 }
 
 void HdfsScanNode::SetDoneInternal(const Status& status) {
@@ -538,3 +540,9 @@ void HdfsScanNode::SetDone() {
   unique_lock<mutex> l(lock_);
   SetDoneInternal(status_);
 }
+
+void HdfsScanNode::SetError(const Status& status) {
+  discard_result(ExecDebugAction(TExecNodePhase::SCANNER_ERROR, runtime_state_));
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal(status);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 0e0f67c..853da31 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -162,10 +162,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
   /// in this split. 'scanner_thread_reservation' is an in/out argument that tracks the
   /// total reservation from 'buffer_pool_client_' that is allotted for this thread's
-  /// use.
-  Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
+  /// use. If an error is encountered, calls SetDoneInternal() with the error to
+  /// initiate shutdown of the scan.
+  void ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool, io::ScanRange* scan_range,
-      int64_t* scanner_thread_reservation) WARN_UNUSED_RESULT;
+      int64_t* scanner_thread_reservation);
 
   /// Called by scanner thread to return some or all of its reservation that is not
   /// needed. Always holds onto at least the minimum reservation to avoid violating the
@@ -185,6 +186,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// Gets lock_ and calls SetDoneInternal(status_). Usually used after the scan node
   /// completes execution successfully.
   void SetDone();
+
+  /// Gets lock_ and calls SetDoneInternal(status). Called after a scanner hits an
+  /// error. Must be called before HdfsScanner::Close() to ensure that 'status'
+  /// is propagated before the scan range is marked as complete by HdfsScanner::Close().
+  void SetError(const Status& status);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index d642313..9cbc99b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -148,6 +148,8 @@ class HdfsScanner {
   /// and memory in mem pools to the given row batch. If the row batch is NULL,
   /// those resources are released instead. In any case, releases all other resources
   /// that are not backing returned rows (e.g. temporary decompression buffers).
+  /// Also marks any associated scan ranges as complete by calling RangeComplete() on the
+  /// scan node.
   /// This function is not idempotent and must only be called once.
   virtual void Close(RowBatch* row_batch) = 0;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 5d245fb..331e432 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -60,6 +60,9 @@ enum TExecNodePhase {
   GETNEXT,
   GETNEXT_SCANNER,
   CLOSE,
+  // After a scanner thread completes a range with an error but before it propagates the
+  // error.
+  SCANNER_ERROR,
   INVALID
 }
 
@@ -72,6 +75,8 @@ enum TDebugAction {
   // A floating point number in range [0.0, 1.0] that gives the probability of denying
   // each reservation increase request after the initial reservation.
   SET_DENY_RESERVATION_PROBABILITY,
+  // Delay for a short amount of time: 100ms
+  DELAY,
 }
 
 // Preference for replica selection

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
new file mode 100644
index 0000000..4104595
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-error-propagation-race.test
@@ -0,0 +1,13 @@
+====
+---- QUERY
+# Add a valid file with a single row to the table.
+INSERT INTO bad_magic_number SELECT 'good';
+---- RESULTS
+: 1
+====
+---- QUERY
+set debug_action="0:SCANNER_ERROR:DELAY";
+SELECT * FROM bad_magic_number
+---- CATCH
+invalid version number
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index d6cc1da..4e879a7 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -32,7 +32,10 @@ from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 
 FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE']
+# Not included:
+# - SCANNER_ERROR, because it only fires if the query already hit an error.
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER',
+                       'CLOSE']
 # Map debug actions to their corresponding query options' values.
 FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
                         'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}

http://git-wip-us.apache.org/repos/asf/impala/blob/449fe73d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index b75c3cc..e8a24b8 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -722,6 +722,21 @@ class TestParquet(ImpalaTestSuite):
 
     self.run_test_case("QueryTest/parquet-type-widening", vector, unique_database)
 
+  def test_error_propagation_race(self, vector, unique_database):
+    """IMPALA-7662: failed scan signals completion before error is propagated. To
+    reproduce, we construct a table with two Parquet files, one valid and another
+    invalid. The scanner thread for the invalid file must propagate the error
+    before we mark the whole scan complete."""
+    if vector.get_value('exec_option')['debug_action'] is not None:
+      pytest.skip(".test file needs to override debug action")
+    create_table_and_copy_files(self.client,
+        "CREATE TABLE {db}.{tbl} (s STRING) STORED AS PARQUET",
+        unique_database, "bad_magic_number", ["testdata/data/bad_magic_number.parquet"])
+    # We need the ranges to all be scheduled on the same impalad.
+    vector.get_value('exec_option')['num_nodes'] = 1
+    self.run_test_case("QueryTest/parquet-error-propagation-race", vector,
+                       unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range