You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/19 08:46:16 UTC

[1/2] kudu git commit: spark: continue scanning after encountering empty batch

Repository: kudu
Updated Branches:
  refs/heads/master 00acc92a8 -> 2840a586d


spark: continue scanning after encountering empty batch

The Spark connector would previously stop scanning after the first empty
batch returned by a tablet server. The tablet server will not return an
empty batch when there are rows remaining in the tablet unless the scan
hits an internal timeout of 500ms[1]. This can only realistically happen
on large scans with highly selective predicates on data not in the block
cache. As a result this behavior only occurs with very large tables on
slow tablet server, which makes it very hard to test.  No unit tests are
included with this patch, but the fix has been verified on a real
cluster exhibiting the issue.

[1] https://github.com/apache/kudu/blob/2ed179a7a188b4748a43a829940764ab5dddbc1c/src/kudu/tserver/tablet_service.cc#L1670

Change-Id: I4fdb7836a27940cab674100da0ef0ea5e050bbdd
Reviewed-on: http://gerrit.cloudera.org:8080/5531
Tested-by: Kudu Jenkins
Reviewed-by: Chris George <ch...@rms.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cd02f9d4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cd02f9d4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cd02f9d4

Branch: refs/heads/master
Commit: cd02f9d409f4d7b06863b92d5d9d325bb05b8d55
Parents: 00acc92
Author: Dan Burkert <da...@apache.org>
Authored: Thu Dec 15 15:41:24 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Dec 19 05:12:43 2016 +0000

----------------------------------------------------------------------
 .../src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala      | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cd02f9d4/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 5ba7c35..a376d56 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -95,8 +95,8 @@ private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) ex
   private var currentIterator: RowResultIterator = null
 
   override def hasNext: Boolean = {
-    if ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
-      (scanner.hasMoreRows && currentIterator == null)) {
+    while ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
+           (scanner.hasMoreRows && currentIterator == null)) {
       currentIterator = scanner.nextRows()
     }
     currentIterator.hasNext


[2/2] kudu git commit: threadpool: avoid calling task destructors while holding lock

Posted by to...@apache.org.
threadpool: avoid calling task destructors while holding lock

Currently, the ThreadPool calls task's destructors while holding its
internal lock. This has two issues:

1) While it's not a great practice, some destructors themselves acquire
locks. This can create lock cycles if those same locks are ever held
concurrently with a submission to the threadpool.

2) Even well-written destructors can be heavy-weight, eg if they free a
large graph of dependent objects. We should not block the threadpool's
progress while running these destructors.

This fixes the issue by being more explicit about where tasks are freed,
both in the Shutdown() code path as well as the normal task dispatch
code.

Change-Id: I4cdc38c7db0a6a7fd640d82895ecaebe0718ee98
Reviewed-on: http://gerrit.cloudera.org:8080/5517
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/2840a586
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/2840a586
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/2840a586

Branch: refs/heads/master
Commit: 2840a586d57012fc4bd805f6aca810e4f48dd37b
Parents: cd02f9d
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Dec 15 17:33:03 2016 +0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Dec 19 08:45:20 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/threadpool-test.cc | 24 +++++++++++++++++++++++
 src/kudu/util/threadpool.cc      | 36 +++++++++++++++++++++++------------
 src/kudu/util/threadpool.h       |  3 ---
 3 files changed, 48 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/2840a586/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 444fbf4..085afcc 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -349,4 +349,28 @@ TEST(TestThreadPool, TestDeadlocks) {
 }
 #endif
 
+class SlowDestructorRunnable : public Runnable {
+ public:
+  void Run() override {}
+
+  virtual ~SlowDestructorRunnable() {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+};
+
+// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
+// in the queue.
+TEST(TestThreadPool, TestSlowDestructor) {
+  gscoped_ptr<ThreadPool> thread_pool;
+  ASSERT_OK(BuildMinMaxTestPool(1, 20, &thread_pool));
+  MonoTime start = MonoTime::Now();
+  for (int i = 0; i < 100; i++) {
+    shared_ptr<Runnable> task(new SlowDestructorRunnable());
+    ASSERT_OK(thread_pool->Submit(std::move(task)));
+  }
+  thread_pool->Wait();
+  ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
+}
+
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/2840a586/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index b2def9c..9ad1cba 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -146,22 +146,19 @@ Status ThreadPool::Init() {
   return Status::OK();
 }
 
-void ThreadPool::ClearQueue() {
-  for (QueueEntry& e : queue_) {
-    if (e.trace) {
-      e.trace->Release();
-    }
-  }
-  queue_.clear();
-  queue_size_ = 0;
-}
-
 void ThreadPool::Shutdown() {
   MutexLock unique_lock(lock_);
   CheckNotPoolThreadUnlocked();
 
   pool_status_ = Status::ServiceUnavailable("The pool has been shut down.");
-  ClearQueue();
+
+  // Clear the queue_ member under the lock, but defer the releasing
+  // of the entries outside the lock, in case there are concurrent threads
+  // wanting to access the ThreadPool. The task's destructors may acquire
+  // locks, etc, so this also prevents lock inversions.
+  auto to_release = std::move(queue_);
+  queue_.clear();
+  queue_size_ = 0;
   not_empty_.Broadcast();
 
   // The Runnable doesn't have Abort() so we must wait
@@ -169,6 +166,14 @@ void ThreadPool::Shutdown() {
   while (num_threads_ > 0) {
     no_threads_cond_.Wait();
   }
+
+  // Finally release the tasks that were in the queue, outside the lock.
+  unique_lock.Unlock();
+  for (QueueEntry& e : to_release) {
+    if (e.trace) {
+      e.trace->Release();
+    }
+  }
 }
 
 Status ThreadPool::SubmitClosure(const Closure& task) {
@@ -313,7 +318,7 @@ void ThreadPool::DispatchThread(bool permanent) {
     }
 
     // Fetch a pending task
-    QueueEntry entry = queue_.front();
+    QueueEntry entry = std::move(queue_.front());
     queue_.pop_front();
     queue_size_--;
     ++active_threads_;
@@ -350,6 +355,13 @@ void ThreadPool::DispatchThread(bool permanent) {
       TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
       TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
     }
+    // Destruct the task while we do not hold the lock.
+    //
+    // The task's destructor may be expensive if it has a lot of bound
+    // objects, and we don't want to block submission of the threadpool.
+    // In the worst case, the destructor might even try to do something
+    // with this threadpool, and produce a deadlock.
+    entry.runnable.reset();
     unique_lock.Lock();
 
     if (--active_threads_ == 0) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/2840a586/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index cfa12f4..0bbce7d 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -201,9 +201,6 @@ class ThreadPool {
   // Initialize the thread pool by starting the minimum number of threads.
   Status Init();
 
-  // Clear all entries from queue_. Requires that lock_ is held.
-  void ClearQueue();
-
   // Dispatcher responsible for dequeueing and executing the tasks
   void DispatchThread(bool permanent);