You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/07/30 01:49:36 UTC

[5/8] incubator-impala git commit: IMPALA-3857: KuduScanNode race on returning "optional" threads

IMPALA-3857: KuduScanNode race on returning "optional" threads

The KuduScanNode could return all active scanner threads
when there were no more "optional" tokens available. In this
case, any remaining scan ranges wouldn't be picked up and
the query would produce incorrect results. This fixes the
issue by cleaning up the ScannerThread code and making sure
not to exit the last thread.

This was tested by running the tpch workload repeatedly
under load. That work to incorporate tpch data loading for
Kudu is actively being worked on, so this was tested manually.

Change-Id: I22adf2109b43b1b37d9a597de85e063431dff155
Reviewed-on: http://gerrit.cloudera.org:8080/3798
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/857b94d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/857b94d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/857b94d0

Branch: refs/heads/master
Commit: 857b94d03cf719da37d6bb95695c57944f004d36
Parents: c77fb62
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Mon Jul 25 16:59:56 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Jul 29 21:36:50 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scan-node.cc | 98 +++++++++++++++++++++++---------------
 be/src/exec/kudu-scan-node.h  | 10 ++--
 2 files changed, 66 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index a08034f..c2ed16c 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -408,60 +408,80 @@ void KuduScanNode::ThreadTokenAvailableCb(
   }
 }
 
-void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* key_range) {
+Status KuduScanNode::ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range) {
+  RETURN_IF_ERROR(scanner->OpenNextRange(*key_range));
+  bool eos = false;
+  while (!eos) {
+    gscoped_ptr<RowBatch> row_batch(new RowBatch(
+          row_desc(), runtime_state_->batch_size(), mem_tracker()));
+    RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos));
+    while (!done_) {
+      scanner->KeepKuduScannerAlive();
+      if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) {
+        ignore_result(row_batch.release());
+        break;
+      }
+    }
+  }
+  // Mark the current scan range as complete.
+  if (eos) scan_ranges_complete_counter()->Add(1);
+  return Status::OK();
+}
+
+void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initial_range) {
+  DCHECK(initial_range != NULL);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_TIMER(runtime_state_->total_cpu_timer());
 
+  // Set to true if this thread observes that the number of optional threads has been
+  // exceeded and is exiting early.
+  bool optional_thread_exiting = false;
   KuduScanner scanner(this, runtime_state_);
   Status status = scanner.Open();
-  if (!status.ok()) goto done;
-
-  while (!done_) {
-    status = scanner.OpenNextRange(*key_range);
-    if (!status.ok()) goto done;
 
-    // Keep looping through all the ranges.
-    bool eos = false;
-    while (!eos) {
-      // Keep looping through all the rows.
-      gscoped_ptr<RowBatch> row_batch(new RowBatch(
-          row_desc(), runtime_state_->batch_size(), mem_tracker()));
-      status = scanner.GetNext(row_batch.get(), &eos);
-      if (!status.ok()) goto done;
-      while (true) {
-        if (done_) goto done;
-        scanner.KeepKuduScannerAlive();
-        if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) {
-          ignore_result(row_batch.release());
+  if (status.ok()) {
+    const TKuduKeyRange* key_range = initial_range;
+    while (!done_ && key_range != NULL) {
+      status = ProcessRange(&scanner, key_range);
+      if (!status.ok()) break;
+
+      // Check if the number of optional threads has been exceeded.
+      if (runtime_state_->resource_pool()->optional_exceeded()) {
+        unique_lock<mutex> l(lock_);
+        // Don't exit if this is the last thread. Otherwise, the scan will indicate it's
+        // done before all ranges have been processed.
+        if (num_active_scanners_ > 1) {
+          --num_active_scanners_;
+          optional_thread_exiting = true;
           break;
         }
       }
+      key_range = GetNextKeyRange();
     }
-    // Mark the current scan range as complete.
-    scan_ranges_complete_counter()->Add(1);
-    if (runtime_state_->resource_pool()->optional_exceeded()) goto done;
-    key_range = GetNextKeyRange();
-    if (key_range == NULL) goto done;
+    scanner.Close();
   }
 
-done:
-  VLOG(1) << "Thread done: " << name;
-  scanner.Close();
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
-
-  unique_lock<mutex> l(lock_);
-  if (!status.ok()) {
-    if (status_.ok()) {
-      status_ = status;
+  {
+    unique_lock<mutex> l(lock_);
+    if (!status.ok()) {
+      if (status_.ok()) {
+        status_ = status;
+        done_ = true;
+      }
+    }
+    // Decrement num_active_scanners_ unless handling the case of an early exit when
+    // optional threads have been exceeded, in which case it already was decremented.
+    if (!optional_thread_exiting) --num_active_scanners_;
+    if (num_active_scanners_ == 0) {
       done_ = true;
+      materialized_row_batches_->Shutdown();
     }
   }
-  --num_active_scanners_;
-  if (num_active_scanners_ == 0) {
-    // If we got here and we are the last thread, we're all done.
-    done_ = true;
-    materialized_row_batches_->Shutdown();
-  }
+
+  // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which
+  // invokes ThreadTokenAvailableCb() which attempts to take the same lock.
+  VLOG(1) << "Thread done: " << name;
+  runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 329f52a..5dfb309 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -163,11 +163,15 @@ class KuduScanNode : public ScanNode {
   void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
 
   /// Main function for scanner thread which executes a KuduScanner. Begins by processing
-  /// 'initial_range', once that range is completed it fetches more ranges with 'GetNextKeyRange()'
-  /// until there are no more ranges to fetch, an error occurred or the limit has been reached.
-  /// Scanned batches are enqueued in 'materialized_row_batches_'.
+  /// 'initial_range', and continues processing ranges returned by 'GetNextKeyRange()'
+  /// until there are no more ranges, an error occurs, or the limit is reached.
   void ScannerThread(const string& name, const TKuduKeyRange* initial_range);
 
+  /// Processes a single scan range. Row batches are fetched using 'scanner' and enqueued
+  /// in 'materialized_row_batches_' until the scanner reports eos for 'key_range', an
+  /// error occurs, or the limit is reached.
+  Status ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range);
+
   /// Returns the next partition key range to read. Thread safe. Returns NULL if there are
   /// no more ranges.
   TKuduKeyRange* GetNextKeyRange();