You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/08 06:21:15 UTC

[2/3] incubator-impala git commit: IMPALA-6134: Update code base to use impala::ConditionVariable

IMPALA-6134: Update code base to use impala::ConditionVariable

boost::condtion_variable supports thread interruption
which has some overhead. In some places we already use
impala::ConditionVariable which is a very thin layer
around pthread, therefore it has less overhead.

This commit substitues every boost::condition_variable in
the codebase (except under kudu/) to impala::ConditionVariable.

It also extends impala::ConditionVariable class to support
waiting with a given timeout. The WaitFor function takes a duration
as parameter. The WaitUntil function takes an absolute time as
parameter.

Change-Id: I3085c6dcb42350b61244df6e7f091a1e7db356c9
Reviewed-on: http://gerrit.cloudera.org:8080/8428
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1673e726
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1673e726
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1673e726

Branch: refs/heads/master
Commit: 1673e726bb5dc64d316d699f2f63fa00ac321819
Parents: 9923b82
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue Oct 31 17:30:26 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 8 02:16:26 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc             |  6 ++---
 be/src/catalog/catalog-server.h              |  3 ++-
 be/src/exec/plan-root-sink.cc                | 16 ++++++------
 be/src/exec/plan-root-sink.h                 |  7 +++--
 be/src/rpc/thrift-server.cc                  | 12 ++++-----
 be/src/runtime/coordinator.cc                |  6 ++---
 be/src/runtime/coordinator.h                 |  4 +--
 be/src/runtime/data-stream-mgr.h             |  1 -
 be/src/runtime/data-stream-recvr.cc          | 25 +++++++++---------
 be/src/runtime/data-stream-sender.cc         |  8 +++---
 be/src/runtime/disk-io-mgr-internal.h        | 11 ++++----
 be/src/runtime/disk-io-mgr-reader-context.cc |  2 +-
 be/src/runtime/disk-io-mgr-scan-range.cc     |  6 ++---
 be/src/runtime/disk-io-mgr-stress.cc         |  2 ++
 be/src/runtime/disk-io-mgr-stress.h          |  3 ---
 be/src/runtime/disk-io-mgr-test.cc           | 17 ++++++-------
 be/src/runtime/disk-io-mgr.cc                | 12 ++++-----
 be/src/runtime/disk-io-mgr.h                 |  4 +--
 be/src/runtime/fragment-instance-state.cc    | 16 +++++-------
 be/src/runtime/fragment-instance-state.h     |  5 ++--
 be/src/scheduling/admission-controller.cc    |  8 +++---
 be/src/scheduling/admission-controller.h     |  3 ++-
 be/src/service/client-request-state.cc       |  4 +--
 be/src/service/client-request-state.h        |  3 ++-
 be/src/service/impala-server.cc              | 16 +++++-------
 be/src/service/impala-server.h               |  5 ++--
 be/src/statestore/statestore.h               |  1 -
 be/src/util/blocking-queue.h                 |  4 +--
 be/src/util/condition-variable.h             | 31 ++++++++++++++++++-----
 be/src/util/promise.h                        | 11 ++++----
 be/src/util/thread-pool.h                    |  7 ++---
 31 files changed, 137 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b4745fe..b004b22 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -194,7 +194,7 @@ Status CatalogServer::Start() {
   // Notify the thread to start for the first time.
   {
     lock_guard<mutex> l(catalog_lock_);
-    catalog_update_cv_.notify_one();
+    catalog_update_cv_.NotifyOne();
   }
   return Status::OK();
 }
@@ -253,7 +253,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
 
   // Signal the catalog update gathering thread to start.
   topic_updates_ready_ = false;
-  catalog_update_cv_.notify_one();
+  catalog_update_cv_.NotifyOne();
 }
 
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
@@ -264,7 +264,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
     // when topic_updates_ready_ is false, otherwise we may be in the middle of
     // processing a heartbeat.
     while (topic_updates_ready_) {
-      catalog_update_cv_.wait(unique_lock);
+      catalog_update_cv_.Wait(unique_lock);
     }
 
     MonotonicStopWatch sw;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 452a9b9..78a3f20 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -29,6 +29,7 @@
 #include "gen-cpp/Types_types.h"
 #include "catalog/catalog.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/condition-variable.h"
 #include "util/metrics.h"
 #include "rapidjson/rapidjson.h"
 
@@ -95,7 +96,7 @@ class CatalogServer {
   /// fetch its next set of updates from the JniCatalog. At the end of each statestore
   /// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts
   /// querying the JniCatalog for catalog objects. Protected by the catalog_lock_.
-  boost::condition_variable catalog_update_cv_;
+  ConditionVariable catalog_update_cv_;
 
   /// The latest available set of catalog topic updates (additions/modifications, and
   /// deletions). Set by the catalog_update_gathering_thread_ and protected by

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 9c20ff3..c4ad604 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -73,7 +73,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   // written clients may not cope correctly with them. See IMPALA-4335.
   while (current_batch_row < batch->num_rows()) {
     unique_lock<mutex> l(lock_);
-    while (results_ == nullptr && !consumer_done_) sender_cv_.wait(l);
+    while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l);
     if (consumer_done_ || batch == nullptr) {
       eos_ = true;
       return Status::OK();
@@ -101,7 +101,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
     expr_results_pool_->Clear();
     // Signal the consumer.
     results_ = nullptr;
-    consumer_cv_.notify_all();
+    consumer_cv_.NotifyAll();
   }
   return Status::OK();
 }
@@ -110,7 +110,7 @@ Status PlanRootSink::FlushFinal(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
   sender_done_ = true;
   eos_ = true;
-  consumer_cv_.notify_all();
+  consumer_cv_.NotifyAll();
   return Status::OK();
 }
 
@@ -119,17 +119,17 @@ void PlanRootSink::Close(RuntimeState* state) {
   // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
   // well.
   sender_done_ = true;
-  consumer_cv_.notify_all();
+  consumer_cv_.NotifyAll();
   // Wait for consumer to be done, in case sender tries to tear-down this sink while the
   // sender is still reading from it.
-  while (!consumer_done_) sender_cv_.wait(l);
+  while (!consumer_done_) sender_cv_.Wait(l);
   DataSink::Close(state);
 }
 
 void PlanRootSink::CloseConsumer() {
   unique_lock<mutex> l(lock_);
   consumer_done_ = true;
-  sender_cv_.notify_all();
+  sender_cv_.NotifyAll();
 }
 
 Status PlanRootSink::GetNext(
@@ -138,9 +138,9 @@ Status PlanRootSink::GetNext(
 
   results_ = results;
   num_rows_requested_ = num_results;
-  sender_cv_.notify_all();
+  sender_cv_.NotifyAll();
 
-  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l);
+  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l);
 
   *eos = eos_;
   return state->GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 654bd27..f367d04 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -19,8 +19,7 @@
 #define IMPALA_EXEC_PLAN_ROOT_SINK_H
 
 #include "exec/data-sink.h"
-
-#include <boost/thread/condition_variable.hpp>
+#include "util/condition-variable.h"
 
 namespace impala {
 
@@ -95,12 +94,12 @@ class PlanRootSink : public DataSink {
   /// num_rows_requested_, and so the sender may begin satisfying that request for rows
   /// from its current batch. Also signalled when CloseConsumer() is called, to unblock
   /// the sender.
-  boost::condition_variable sender_cv_;
+  ConditionVariable sender_cv_;
 
   /// Waited on by the consumer only. Signalled when the sender has finished serving a
   /// request for rows. Also signalled by Close() and FlushFinal() to signal to the
   /// consumer that no more rows are coming.
-  boost::condition_variable consumer_cv_;
+  ConditionVariable consumer_cv_;
 
   /// Signals to producer that the consumer is done, and the sink may be torn down.
   bool consumer_done_ = false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 5bf47b2..ab51315 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -18,7 +18,6 @@
 #include <boost/filesystem.hpp>
 #include <boost/thread.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/uuid/uuid_io.hpp>
 
 #include <thrift/concurrency/Thread.h>
@@ -37,6 +36,7 @@
 #include "rpc/authentication.h"
 #include "rpc/thrift-server.h"
 #include "rpc/thrift-thread.h"
+#include "util/condition-variable.h"
 #include "util/debug-util.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
@@ -139,13 +139,13 @@ class ThriftServer::ThriftServerEventProcessor : public TServerEventHandler {
 
  private:
   // Lock used to ensure that there are no missed notifications between starting the
-  // supervision thread and calling signal_cond_.timed_wait. Also used to ensure
+  // supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
   // thread-safe access to members of thrift_server_
   boost::mutex signal_lock_;
 
   // Condition variable that is notified by the supervision thread once either
   // a) all is well or b) an error occurred.
-  boost::condition_variable signal_cond_;
+  ConditionVariable signal_cond_;
 
   // The ThriftServer under management. This class is a friend of ThriftServer, and
   // reaches in to change member variables at will.
@@ -179,7 +179,7 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
   // visibility.
   while (!signal_fired_) {
     // Yields lock and allows supervision thread to continue and signal
-    if (!signal_cond_.timed_wait(lock, deadline)) {
+    if (!signal_cond_.WaitUntil(lock, deadline)) {
       stringstream ss;
       ss << "ThriftServer '" << thrift_server_->name_ << "' (on port: "
          << thrift_server_->port_ << ") did not start within "
@@ -220,7 +220,7 @@ void ThriftServer::ThriftServerEventProcessor::Supervise() {
     // failure, for example.
     signal_fired_ = true;
   }
-  signal_cond_.notify_all();
+  signal_cond_.NotifyAll();
 }
 
 void ThriftServer::ThriftServerEventProcessor::preServe() {
@@ -234,7 +234,7 @@ void ThriftServer::ThriftServerEventProcessor::preServe() {
   thrift_server_->started_ = true;
 
   // Should only be one thread waiting on signal_cond_, but wake all just in case.
-  signal_cond_.notify_all();
+  signal_cond_.NotifyAll();
 }
 
 // This thread-local variable contains the current connection context for whichever

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7fe6fb7..ae0c9ad 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -796,7 +796,7 @@ Status Coordinator::WaitForBackendCompletion() {
   while (num_remaining_backends_ > 0 && query_status_.ok()) {
     VLOG_QUERY << "Coordinator waiting for backends to finish, "
                << num_remaining_backends_ << " remaining";
-    backend_completion_cv_.wait(l);
+    backend_completion_cv_.Wait(l);
   }
   if (query_status_.ok()) {
     VLOG_QUERY << "All backends finished successfully.";
@@ -913,7 +913,7 @@ void Coordinator::CancelInternal() {
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
-  backend_completion_cv_.notify_all();
+  backend_completion_cv_.NotifyAll();
 
   ReleaseExecResourcesLocked();
   ReleaseAdmissionControlResourcesLocked();
@@ -966,7 +966,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
       BackendState::LogFirstInProgress(backend_states_);
     }
     if (--num_remaining_backends_ == 0 || !status.ok()) {
-      backend_completion_cv_.notify_all();
+      backend_completion_cv_.NotifyAll();
     }
     return Status::OK();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 3549ae9..e7ddee9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -28,7 +28,6 @@
 #include <boost/accumulators/statistics/stats.hpp>
 #include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
@@ -41,6 +40,7 @@
 #include "gen-cpp/Types_types.h"
 #include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
 #include "scheduling/query-schedule.h"
+#include "util/condition-variable.h"
 #include "util/progress-updater.h"
 
 namespace impala {
@@ -298,7 +298,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// If there is no coordinator fragment, Wait() simply waits until all
   /// backends report completion by notifying on backend_completion_cv_.
   /// Tied to lock_.
-  boost::condition_variable backend_completion_cv_;
+  ConditionVariable backend_completion_cv_;
 
   /// Count of the number of backends for which done != true. When this
   /// hits 0, any Wait()'ing thread is notified

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index eff468e..1e9f4b6 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -22,7 +22,6 @@
 #include <list>
 #include <set>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index d8150eb..cdea4a0 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -23,13 +23,12 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/sorted-run-merger.h"
+#include "util/condition-variable.h"
 #include "util/runtime-profile-counters.h"
 #include "util/periodic-counter-updater.h"
 
 #include "common/names.h"
 
-using boost::condition_variable;
-
 namespace impala {
 
 // Implements a blocking queue of row batches from one or more senders. One queue
@@ -83,10 +82,10 @@ class DataStreamRecvr::SenderQueue {
   int num_remaining_senders_;
 
   // signal arrival of new batch or the eos/cancelled condition
-  condition_variable data_arrival_cv_;
+  ConditionVariable data_arrival_cv_;
 
   // signal removal of data by stream consumer
-  condition_variable data_removal__cv_;
+  ConditionVariable data_removal__cv_;
 
   // queue of (batch length, batch) pairs.  The SenderQueue block owns memory to
   // these batches. They are handed off to the caller via GetBatch.
@@ -120,7 +119,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
     CANCEL_SAFE_SCOPED_TIMER(
         received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_,
         &is_cancelled_);
-    data_arrival_cv_.wait(l);
+    data_arrival_cv_.Wait(l);
   }
 
   // cur_batch_ must be replaced with the returned batch.
@@ -140,7 +139,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
   recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
   VLOG_ROW << "fetched #rows=" << result->num_rows();
   batch_queue_.pop_front();
-  data_removal__cv_.notify_one();
+  data_removal__cv_.NotifyOne();
   current_batch_.reset(result);
   *next_batch = current_batch_.get();
   return Status::OK();
@@ -175,10 +174,10 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
       try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
       if (timer_lock) {
         CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_wall_timer_, &is_cancelled_);
-        data_removal__cv_.wait(l);
+        data_removal__cv_.Wait(l);
         got_timer_lock = true;
       } else {
-        data_removal__cv_.wait(l);
+        data_removal__cv_.Wait(l);
         got_timer_lock = false;
       }
     }
@@ -197,7 +196,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
     // time it takes this thread to finish (and yield lock_) and the
     // notified thread to be woken up and to acquire the try_lock. In
     // practice, this time is small relative to the total wait time.
-    if (got_timer_lock) data_removal__cv_.notify_one();
+    if (got_timer_lock) data_removal__cv_.NotifyOne();
   }
 
   if (!is_cancelled_) {
@@ -213,7 +212,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
              << " batch_size=" << batch_size << "\n";
     batch_queue_.push_back(make_pair(batch_size, batch));
     recvr_->num_buffered_bytes_.Add(batch_size);
-    data_arrival_cv_.notify_one();
+    data_arrival_cv_.NotifyOne();
   }
 }
 
@@ -225,7 +224,7 @@ void DataStreamRecvr::SenderQueue::DecrementSenders() {
             << recvr_->fragment_instance_id()
             << " node_id=" << recvr_->dest_node_id()
             << " #senders=" << num_remaining_senders_;
-  if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one();
+  if (num_remaining_senders_ == 0) data_arrival_cv_.NotifyOne();
 }
 
 void DataStreamRecvr::SenderQueue::Cancel() {
@@ -239,8 +238,8 @@ void DataStreamRecvr::SenderQueue::Cancel() {
   }
   // Wake up all threads waiting to produce/consume batches.  They will all
   // notice that the stream is cancelled and handle it.
-  data_arrival_cv_.notify_all();
-  data_removal__cv_.notify_all();
+  data_arrival_cv_.NotifyAll();
+  data_removal__cv_.NotifyAll();
 }
 
 void DataStreamRecvr::SenderQueue::Close() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index dc26a23..ed156dc 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -33,6 +33,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/backend-client.h"
 #include "util/aligned-new.h"
+#include "util/condition-variable.h"
 #include "util/debug-util.h"
 #include "util/network-util.h"
 #include "util/thread-pool.h"
@@ -45,7 +46,6 @@
 
 #include "common/names.h"
 
-using boost::condition_variable;
 using namespace apache::thrift;
 using namespace apache::thrift::protocol;
 using namespace apache::thrift::transport;
@@ -133,7 +133,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   // TODO: if the order of row batches does not matter, we can consider increasing
   // the number of threads.
   ThreadPool<TRowBatch*> rpc_thread_; // sender thread.
-  condition_variable rpc_done_cv_;   // signaled when rpc_in_flight_ is set to true.
+  ConditionVariable rpc_done_cv_;   // signaled when rpc_in_flight_ is set to true.
   mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_
   bool rpc_in_flight_;  // true if the rpc_thread_ is busy sending.
 
@@ -188,7 +188,7 @@ void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* bat
     unique_lock<mutex> l(rpc_thread_lock_);
     rpc_in_flight_ = false;
   }
-  rpc_done_cv_.notify_one();
+  rpc_done_cv_.NotifyOne();
 }
 
 void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
@@ -239,7 +239,7 @@ void DataStreamSender::Channel::WaitForRpc() {
   SCOPED_TIMER(parent_->state_->total_network_send_timer());
   unique_lock<mutex> l(rpc_thread_lock_);
   while (rpc_in_flight_) {
-    rpc_done_cv_.wait(l);
+    rpc_done_cv_.Wait(l);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index a9acca4..138f3f0 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -28,6 +28,7 @@
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
+#include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
@@ -51,7 +52,7 @@ struct DiskIoMgr::DiskQueue {
   /// thread should shut down.  A disk thread will be woken up when there is a reader
   /// added to the queue. A reader is only on the queue when it has at least one
   /// scan range that is not blocked on available buffers.
-  boost::condition_variable work_available;
+  ConditionVariable work_available;
 
   /// list of all request contexts that have work queued on this disk
   std::list<DiskIoRequestContext*> request_contexts;
@@ -65,7 +66,7 @@ struct DiskIoMgr::DiskQueue {
           request_contexts.end());
       request_contexts.push_back(worker);
     }
-    work_available.notify_all();
+    work_available.NotifyAll();
   }
 
   DiskQueue(int id) : disk_id(id) { }
@@ -157,7 +158,7 @@ class DiskIoRequestContext {
     // boost doesn't let us dcheck that the reader lock is taken
     DCHECK_GT(num_disks_with_ranges_, 0);
     if (--num_disks_with_ranges_ == 0) {
-      disks_complete_cond_var_.notify_all();
+      disks_complete_cond_var_.NotifyAll();
     }
     DCHECK(Validate()) << std::endl << DebugString();
   }
@@ -289,13 +290,13 @@ class DiskIoRequestContext {
   /// We currently populate one range per disk.
   /// TODO: think about this some more.
   InternalQueue<ScanRange> ready_to_start_ranges_;
-  boost::condition_variable ready_to_start_ranges_cv_;  // used with lock_
+  ConditionVariable ready_to_start_ranges_cv_;  // used with lock_
 
   /// Ranges that are blocked due to back pressure on outgoing buffers.
   InternalQueue<ScanRange> blocked_ranges_;
 
   /// Condition variable for UnregisterContext() to wait for all disks to complete
-  boost::condition_variable disks_complete_cond_var_;
+  ConditionVariable disks_complete_cond_var_;
 
   /// Struct containing state per disk. See comments in the disk read loop on how
   /// they are used.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index 77f332a..afe2b23 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -88,7 +88,7 @@ void DiskIoRequestContext::Cancel(const Status& status) {
 
   // Signal reader and unblock the GetNext/Read thread.  That read will fail with
   // a cancelled status.
-  ready_to_start_ranges_cv_.notify_all();
+  ready_to_start_ranges_cv_.NotifyAll();
 }
 
 void DiskIoRequestContext::AddRequestRange(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index c5c9514..7f0692e 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -67,7 +67,7 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(
     blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
   }
 
-  buffer_ready_cv_.notify_one();
+  buffer_ready_cv_.NotifyOne();
 
   return blocked_on_queue_;
 }
@@ -81,7 +81,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
     DCHECK(Validate()) << DebugString();
 
     while (ready_buffers_.empty() && !is_cancelled_) {
-      buffer_ready_cv_.wait(scan_range_lock);
+      buffer_ready_cv_.Wait(scan_range_lock);
     }
 
     if (is_cancelled_) {
@@ -153,7 +153,7 @@ void DiskIoMgr::ScanRange::Cancel(const Status& status) {
     is_cancelled_ = true;
     status_ = status;
   }
-  buffer_ready_cv_.notify_all();
+  buffer_ready_cv_.NotifyAll();
   CleanupQueuedBuffers();
 
   // For cached buffers, we can't close the range until the cached buffer is returned.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index 658b747..3959194 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <boost/thread/mutex.hpp>
+
 #include "runtime/disk-io-mgr-stress.h"
 
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h
index 6d7549e..0a66f2c 100644
--- a/be/src/runtime/disk-io-mgr-stress.h
+++ b/be/src/runtime/disk-io-mgr-stress.h
@@ -22,10 +22,7 @@
 #include <memory>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/thread.hpp>
-#include <boost/unordered_map.hpp>
 
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index a6a719f..8d36ea6 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -27,6 +27,7 @@
 #include "runtime/disk-io-mgr-stress.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
+#include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
 #include "util/thread.h"
@@ -37,8 +38,6 @@ DECLARE_int32(num_remote_hdfs_io_threads);
 DECLARE_int32(num_s3_io_threads);
 DECLARE_int32(num_adls_io_threads);
 
-using boost::condition_variable;
-
 const int MIN_BUFFER_SIZE = 512;
 const int MAX_BUFFER_SIZE = 1024;
 const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
@@ -72,7 +71,7 @@ class DiskIoMgrTest : public testing::Test {
     {
       lock_guard<mutex> l(written_mutex_);
       ++num_ranges_written_;
-      if (num_ranges_written_ == num_writes) writes_done_.notify_one();
+      if (num_ranges_written_ == num_writes) writes_done_.NotifyOne();
     }
   }
 
@@ -81,7 +80,7 @@ class DiskIoMgrTest : public testing::Test {
     {
       lock_guard<mutex> l(written_mutex_);
       ++num_ranges_written_;
-      if (num_ranges_written_ == num_writes) writes_done_.notify_all();
+      if (num_ranges_written_ == num_writes) writes_done_.NotifyAll();
     }
   }
 
@@ -179,7 +178,7 @@ class DiskIoMgrTest : public testing::Test {
   ObjectPool pool_;
 
   mutex written_mutex_;
-  condition_variable writes_done_;
+  ConditionVariable writes_done_;
   int num_ranges_written_;
 };
 
@@ -229,7 +228,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
 
       {
         unique_lock<mutex> lock(written_mutex_);
-        while (num_ranges_written_ < num_ranges) writes_done_.wait(lock);
+        while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock);
       }
       num_ranges_written_ = 0;
       io_mgr.UnregisterContext(writer);
@@ -283,7 +282,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
 
   {
     unique_lock<mutex> lock(written_mutex_);
-    while (num_ranges_written_ < 2) writes_done_.wait(lock);
+    while (num_ranges_written_ < 2) writes_done_.Wait(lock);
   }
   num_ranges_written_ = 0;
   io_mgr.UnregisterContext(writer);
@@ -342,7 +341,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
 
       {
         unique_lock<mutex> lock(written_mutex_);
-        while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.wait(lock);
+        while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock);
       }
       num_ranges_written_ = 0;
       io_mgr.UnregisterContext(writer);
@@ -810,7 +809,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
 
             {
               unique_lock<mutex> lock(written_mutex_);
-              while (num_ranges_written_ < num_write_ranges) writes_done_.wait(lock);
+              while (num_ranges_written_ < num_write_ranges) writes_done_.Wait(lock);
             }
 
             threads.join_all();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 54dfc98..8af70f5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -343,7 +343,7 @@ DiskIoMgr::~DiskIoMgr() {
       // to shut_down_ are protected.
       unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
     }
-    disk_queues_[i]->work_available.notify_all();
+    disk_queues_[i]->work_available.NotifyAll();
   }
   disk_thread_group_.JoinAll();
 
@@ -469,7 +469,7 @@ void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks
     unique_lock<mutex> lock(context->lock_);
     DCHECK(context->Validate()) << endl << context->DebugString();
     while (context->num_disks_with_ranges_ > 0) {
-      context->disks_complete_cond_var_.wait(lock);
+      context->disks_complete_cond_var_.Wait(lock);
     }
   }
 }
@@ -639,7 +639,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
     }
 
     if (reader->ready_to_start_ranges_.empty()) {
-      reader->ready_to_start_ranges_cv_.wait(reader_lock);
+      reader->ready_to_start_ranges_cv_.Wait(reader_lock);
     } else {
       *range = reader->ready_to_start_ranges_.Dequeue();
       DCHECK(*range != nullptr);
@@ -846,7 +846,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
 
       while (!shut_down_ && disk_queue->request_contexts.empty()) {
         // wait if there are no readers on the queue
-        disk_queue->work_available.wait(disk_lock);
+        disk_queue->work_available.Wait(disk_lock);
       }
       if (shut_down_) break;
       DCHECK(!disk_queue->request_contexts.empty());
@@ -909,9 +909,9 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
         // All the ranges have been started, notify everyone blocked on GetNextRange.
         // Only one of them will get work so make sure to return nullptr to the other
         // caller threads.
-        (*request_context)->ready_to_start_ranges_cv_.notify_all();
+        (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
       } else {
-        (*request_context)->ready_to_start_ranges_cv_.notify_one();
+        (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index ed33942..e7a7122 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -25,7 +25,6 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_set.hpp>
 #include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
 
 #include "common/atomic.h"
 #include "common/hdfs.h"
@@ -35,6 +34,7 @@
 #include "runtime/thread-resource-mgr.h"
 #include "util/aligned-new.h"
 #include "util/bit-util.h"
+#include "util/condition-variable.h"
 #include "util/error-util.h"
 #include "util/internal-queue.h"
 #include "util/runtime-profile.h"
@@ -563,7 +563,7 @@ class DiskIoMgr : public CacheLineAligned {
 
     /// IO buffers that are queued for this scan range.
     /// Condition variable for GetNext
-    boost::condition_variable buffer_ready_cv_;
+    ConditionVariable buffer_ready_cv_;
     std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
     /// Lock that should be taken during hdfs calls. Only one thread (the disk reading

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index a7d3c86..f957dd1 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -230,7 +230,7 @@ Status FragmentInstanceState::Prepare() {
         thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
     // Make sure the thread started up, otherwise ReportProfileThread() might get into
     // a race with StopReportThread().
-    while (!report_thread_active_) report_thread_started_cv_.wait(l);
+    while (!report_thread_active_) report_thread_started_cv_.Wait(l);
   }
 
   return Status::OK();
@@ -322,28 +322,26 @@ void FragmentInstanceState::ReportProfileThread() {
   unique_lock<mutex> l(report_thread_lock_);
   // tell Prepare() that we started
   report_thread_active_ = true;
-  report_thread_started_cv_.notify_one();
+  report_thread_started_cv_.NotifyOne();
 
   // Jitter the reporting time of remote fragments by a random amount between
   // 0 and the report_interval.  This way, the coordinator doesn't get all the
   // updates at once so its better for contention as well as smoother progress
   // reporting.
   int report_fragment_offset = rand() % FLAGS_status_report_interval;
-  boost::system_time timeout = boost::get_system_time()
-      + boost::posix_time::seconds(report_fragment_offset);
+  boost::posix_time::seconds wait_duration(report_fragment_offset);
   // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.timed_wait(l, timeout);
+  stop_report_thread_cv_.WaitFor(l, wait_duration);
 
   while (report_thread_active_) {
-    boost::system_time timeout = boost::get_system_time()
-        + boost::posix_time::seconds(FLAGS_status_report_interval);
+    boost::posix_time::seconds loop_wait_duration(FLAGS_status_report_interval);
 
     // timed_wait can return because the timeout occurred or the condition variable
     // was signaled.  We can't rely on its return value to distinguish between the
     // two cases (e.g. there is a race here where the wait timed out but before grabbing
     // the lock, the condition variable was signaled).  Instead, we will use an external
     // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.timed_wait(l, timeout);
+    stop_report_thread_cv_.WaitFor(l, loop_wait_duration);
 
     if (!report_thread_active_) break;
     SendReport(false, Status::OK());
@@ -378,7 +376,7 @@ void FragmentInstanceState::StopReportThread() {
     lock_guard<mutex> l(report_thread_lock_);
     report_thread_active_ = false;
   }
-  stop_report_thread_cv_.notify_one();
+  stop_report_thread_cv_.NotifyOne();
   report_thread_->Join();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index fa35c6b..f540b56 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -28,6 +28,7 @@
 
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "runtime/row-batch.h"
+#include "util/condition-variable.h"
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 
@@ -138,11 +139,11 @@ class FragmentInstanceState {
 
   /// Indicates that profile reporting thread should stop.
   /// Tied to report_thread_lock_.
-  boost::condition_variable stop_report_thread_cv_;
+  ConditionVariable stop_report_thread_cv_;
 
   /// Indicates that profile reporting thread started.
   /// Tied to report_thread_lock_.
-  boost::condition_variable report_thread_started_cv_;
+  ConditionVariable report_thread_started_cv_;
 
   /// When the report thread starts, it sets report_thread_active_ to true and signals
   /// report_thread_started_cv_. The report thread is shut down by setting

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 8994a5b..99f659a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -233,7 +233,7 @@ AdmissionController::~AdmissionController() {
     // Lock to ensure the dequeue thread will see the update to done_
     lock_guard<mutex> l(admission_ctrl_lock_);
     done_ = true;
-    dequeue_cv_.notify_one();
+    dequeue_cv_.NotifyOne();
   }
   dequeue_thread_->Join();
 }
@@ -620,7 +620,7 @@ void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
     VLOG_RPC << "Released query id=" << schedule.query_id() << " "
              << stats->DebugString();
   }
-  dequeue_cv_.notify_one();
+  dequeue_cv_.NotifyOne();
 }
 
 // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.
@@ -648,7 +648,7 @@ void AdmissionController::UpdatePoolStats(
     }
     UpdateClusterAggregates();
   }
-  dequeue_cv_.notify_one(); // Dequeue and admit queries on the dequeue thread
+  dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread
 }
 
 void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
@@ -821,7 +821,7 @@ void AdmissionController::DequeueLoop() {
   while (true) {
     unique_lock<mutex> lock(admission_ctrl_lock_);
     if (done_) break;
-    dequeue_cv_.wait(lock);
+    dequeue_cv_.Wait(lock);
     for (const PoolConfigMap::value_type& entry: pool_config_map_) {
       const string& pool_name = entry.first;
       const TPoolConfig& pool_config = entry.second;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 0cb9f2a..2830bee 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -31,6 +31,7 @@
 #include "scheduling/request-pool-service.h"
 #include "scheduling/query-schedule.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/condition-variable.h"
 #include "util/internal-queue.h"
 #include "util/thread.h"
 
@@ -409,7 +410,7 @@ class AdmissionController {
 
   /// Notifies the dequeuing thread that pool stats have changed and it may be
   /// possible to dequeue and admit queries.
-  boost::condition_variable dequeue_cv_;
+  ConditionVariable dequeue_cv_;
 
   /// If true, tear down the dequeuing thread. This only happens in unit tests.
   bool done_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 523e4ae..5d00d24 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -602,12 +602,12 @@ void ClientRequestState::BlockOnWait() {
     l.lock();
     is_block_on_wait_joining_ = false;
     wait_thread_.reset();
-    block_on_wait_cv_.notify_all();
+    block_on_wait_cv_.NotifyAll();
   } else {
     // Another thread is already joining with wait_thread_.  Block on the cond-var
     // until the Join() executed in the other thread has completed.
     do {
-      block_on_wait_cv_.wait(l);
+      block_on_wait_cv_.Wait(l);
     } while (is_block_on_wait_joining_);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index a5c03f5..968ae04 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -25,6 +25,7 @@
 #include "service/child-query.h"
 #include "service/impala-server.h"
 #include "util/auth-util.h"
+#include "util/condition-variable.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Frontend_types.h"
@@ -254,7 +255,7 @@ class ClientRequestState {
 
   /// Condition variable to make BlockOnWait() thread-safe. One thread joins
   /// wait_thread_, and all other threads block on this cv. Used with lock_.
-  boost::condition_variable block_on_wait_cv_;
+  ConditionVariable block_on_wait_cv_;
 
   /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
   bool is_block_on_wait_joining_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4b225b4..07a83eb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1123,7 +1123,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
     multiset<int32_t>::const_iterator itr = session_timeout_set_.find(session_timeout);
     DCHECK(itr != session_timeout_set_.end());
     session_timeout_set_.erase(itr);
-    session_timeout_cv_.notify_one();
+    session_timeout_cv_.NotifyOne();
   }
   return Status::OK();
 }
@@ -1448,7 +1448,7 @@ void ImpalaServer::CatalogUpdateCallback(
     unique_lock<mutex> unique_lock(catalog_version_lock_);
     min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
   }
-  catalog_version_update_cv_.notify_all();
+  catalog_version_update_cv_.NotifyAll();
 }
 
 Status ImpalaServer::ProcessCatalogUpdateResult(
@@ -1487,7 +1487,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
              << " current version: " << catalog_update_info_.catalog_version;
   while (catalog_update_info_.catalog_version < min_req_catalog_version &&
          catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.wait(unique_lock);
+    catalog_version_update_cv_.Wait(unique_lock);
   }
 
   if (!wait_for_all_subscribers) return Status::OK();
@@ -1502,7 +1502,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
              << min_subscriber_catalog_topic_version_;
   while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
          catalog_update_info_.catalog_service_id == catalog_service_id) {
-    catalog_version_update_cv_.wait(unique_lock);
+    catalog_version_update_cv_.Wait(unique_lock);
   }
   return Status::OK();
 }
@@ -1786,7 +1786,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
   if (session_timeout <= 0) return;
   lock_guard<mutex> l(session_timeout_lock_);
   session_timeout_set_.insert(session_timeout);
-  session_timeout_cv_.notify_one();
+  session_timeout_cv_.NotifyOne();
 }
 
 [[noreturn]] void ImpalaServer::ExpireSessions() {
@@ -1794,12 +1794,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
     {
       unique_lock<mutex> timeout_lock(session_timeout_lock_);
       if (session_timeout_set_.empty()) {
-        session_timeout_cv_.wait(timeout_lock);
+        session_timeout_cv_.Wait(timeout_lock);
       } else {
         // Sleep for a second before checking whether an active session can be expired.
-        const int64_t SLEEP_TIME_MS = 1000;
-        system_time deadline = get_system_time() + milliseconds(SLEEP_TIME_MS);
-        session_timeout_cv_.timed_wait(timeout_lock, deadline);
+        session_timeout_cv_.WaitFor(timeout_lock, seconds(1));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f0dfa06..81ec929 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -35,6 +35,7 @@
 #include "common/status.h"
 #include "service/frontend.h"
 #include "service/query-options.h"
+#include "util/condition-variable.h"
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
 #include "util/simple-logger.h"
@@ -800,7 +801,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// session_timeout_thread_ relies on the following conditional variable to wake up
   /// on every poll period expiration or when the poll period changes.
-  boost::condition_variable session_timeout_cv_;
+  ConditionVariable session_timeout_cv_;
 
   /// map from query id to exec state; ClientRequestState is owned by us and referenced
   /// as a shared_ptr to allow asynchronous deletion
@@ -922,7 +923,7 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::mutex catalog_version_lock_;
 
   /// Variable to signal when the catalog version has been modified
-  boost::condition_variable catalog_version_update_cv_;
+  ConditionVariable catalog_version_update_cv_;
 
   /// Contains details on the version information of a catalog update.
   struct CatalogUpdateVersionInfo {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 26aa836..86a0d9e 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -24,7 +24,6 @@
 #include <vector>
 
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a32345c..a4b1b8f 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -19,7 +19,6 @@
 #ifndef IMPALA_UTIL_BLOCKING_QUEUE_H
 #define IMPALA_UTIL_BLOCKING_QUEUE_H
 
-#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <deque>
@@ -141,12 +140,11 @@ class BlockingQueue : public CacheLineAligned {
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     boost::system_time wtime = boost::get_system_time() +
         boost::posix_time::microseconds(timeout_micros);
-    const struct timespec timeout = boost::detail::to_timespec(wtime);
     bool notified = true;
     while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
       timer.Start();
       // Wait until we're notified or until the timeout expires.
-      notified = put_cv_.TimedWait(write_lock, &timeout);
+      notified = put_cv_.WaitUntil(write_lock, wtime);
       timer.Stop();
     }
     total_put_wait_time_ += timer.ElapsedTime();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
index b0b1090..c1a1e56 100644
--- a/be/src/util/condition-variable.h
+++ b/be/src/util/condition-variable.h
@@ -20,6 +20,7 @@
 
 #include <boost/thread/pthread/timespec.hpp>
 #include <boost/thread/mutex.hpp>
+#include <boost/thread/thread_time.hpp>
 #include <pthread.h>
 #include <unistd.h>
 
@@ -34,7 +35,7 @@ class ConditionVariable {
   ~ConditionVariable() { pthread_cond_destroy(&cv_); }
 
   /// Wait indefinitely on the condition variable until it's notified.
-  inline void Wait(boost::unique_lock<boost::mutex>& lock) {
+  void Wait(boost::unique_lock<boost::mutex>& lock) {
     DCHECK(lock.owns_lock());
     pthread_mutex_t* mutex = lock.mutex()->native_handle();
     pthread_cond_wait(&cv_, mutex);
@@ -43,18 +44,36 @@ class ConditionVariable {
   /// Wait until the condition variable is notified or 'timeout' has passed.
   /// Returns true if the condition variable is notified before the absolute timeout
   /// specified in 'timeout' has passed. Returns false otherwise.
-  inline bool TimedWait(boost::unique_lock<boost::mutex>& lock,
-      const struct timespec* timeout) {
+  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
+      const timespec& abs_time) {
     DCHECK(lock.owns_lock());
     pthread_mutex_t* mutex = lock.mutex()->native_handle();
-    return pthread_cond_timedwait(&cv_, mutex, timeout) == 0;
+    return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0;
+  }
+
+  /// Wait until the condition variable is notified or 'abs_time' has passed.
+  /// Returns true if the condition variable is notified before the absolute timeout
+  /// specified in 'abs_time' has passed. Returns false otherwise.
+  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
+      const boost::system_time& abs_time) {
+    return WaitUntil(lock, to_timespec(abs_time));
+  }
+
+  /// Wait until the condition variable is notified or have waited for the time
+  /// specified in 'wait_duration'.
+  /// Returns true if the condition variable is notified in time.
+  /// Returns false otherwise.
+  template <typename duration_type>
+  bool WaitFor(boost::unique_lock<boost::mutex>& lock,
+      const duration_type& wait_duration) {
+    return WaitUntil(lock, to_timespec(boost::get_system_time() + wait_duration));
   }
 
   /// Notify a single waiter on this condition variable.
-  inline void NotifyOne() { pthread_cond_signal(&cv_); }
+  void NotifyOne() { pthread_cond_signal(&cv_); }
 
   /// Notify all waiters on this condition variable.
-  inline void NotifyAll() { pthread_cond_broadcast(&cv_); }
+  void NotifyAll() { pthread_cond_broadcast(&cv_); }
 
  private:
   pthread_cond_t cv_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/util/promise.h b/be/src/util/promise.h
index c6b8f15..5de2d13 100644
--- a/be/src/util/promise.h
+++ b/be/src/util/promise.h
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <boost/thread.hpp>
 
+#include "util/condition-variable.h"
 #include "util/time.h"
 #include "common/atomic.h"
 #include "common/logging.h"
@@ -53,9 +54,9 @@ class Promise {
     ///   p.get();
     /// }
     /// < promise object gets destroyed >
-    /// Calling notify_all() with the val_lock_ guarantees that the thread calling
+    /// Calling NotifyAll() with the val_lock_ guarantees that the thread calling
     /// Set() is done and the promise is safe to delete.
-    val_set_cond_.notify_all();
+    val_set_cond_.NotifyAll();
   }
 
   /// Blocks until a value is set, and then returns a reference to that value. Once Get()
@@ -63,7 +64,7 @@ class Promise {
   const T& Get() {
     boost::unique_lock<boost::mutex> l(val_lock_);
     while (!val_is_set_) {
-      val_set_cond_.wait(l);
+      val_set_cond_.Wait(l);
     }
     return val_;
   }
@@ -86,7 +87,7 @@ class Promise {
       boost::posix_time::microseconds wait_time =
           boost::posix_time::microseconds(std::max<int64_t>(
               1, timeout_micros - (now - start)));
-      val_set_cond_.timed_wait(l, wait_time);
+      val_set_cond_.WaitFor(l, wait_time);
       now = MonotonicMicros();
     }
     *timed_out = !val_is_set_;
@@ -102,7 +103,7 @@ class Promise {
  private:
   /// These variables deal with coordination between consumer and producer, and protect
   /// access to val_;
-  boost::condition_variable val_set_cond_;
+  ConditionVariable val_set_cond_;
   bool val_is_set_;
   boost::mutex val_lock_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 4b5dbf0..fedba58 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -24,6 +24,7 @@
 #include <boost/bind/mem_fn.hpp>
 
 #include "util/aligned-new.h"
+#include "util/condition-variable.h"
 #include "util/thread.h"
 
 namespace impala {
@@ -135,7 +136,7 @@ class ThreadPool : public CacheLineAligned {
       // If the ThreadPool is not initialized, then the queue must be empty.
       DCHECK(initialized_ || work_queue_.Size() == 0);
       while (work_queue_.Size() != 0) {
-        empty_cv_.wait(l);
+        empty_cv_.Wait(l);
       }
     }
     Shutdown();
@@ -156,7 +157,7 @@ class ThreadPool : public CacheLineAligned {
         /// GetSize() and wait()'ing when the condition variable is notified.
         /// (It will hang if we notify right before calling wait().)
         boost::unique_lock<boost::mutex> l(lock_);
-        empty_cv_.notify_all();
+        empty_cv_.NotifyAll();
       }
     }
   }
@@ -200,7 +201,7 @@ class ThreadPool : public CacheLineAligned {
   bool shutdown_ = false;
 
   /// Signalled when the queue becomes empty
-  boost::condition_variable empty_cv_;
+  ConditionVariable empty_cv_;
 };
 
 /// Utility thread-pool that accepts callable work items, and simply invokes them.