You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/08 06:21:15 UTC
[2/3] incubator-impala git commit: IMPALA-6134: Update code base to
use impala::ConditionVariable
IMPALA-6134: Update code base to use impala::ConditionVariable
boost::condtion_variable supports thread interruption
which has some overhead. In some places we already use
impala::ConditionVariable which is a very thin layer
around pthread, therefore it has less overhead.
This commit substitues every boost::condition_variable in
the codebase (except under kudu/) to impala::ConditionVariable.
It also extends impala::ConditionVariable class to support
waiting with a given timeout. The WaitFor function takes a duration
as parameter. The WaitUntil function takes an absolute time as
parameter.
Change-Id: I3085c6dcb42350b61244df6e7f091a1e7db356c9
Reviewed-on: http://gerrit.cloudera.org:8080/8428
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1673e726
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1673e726
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1673e726
Branch: refs/heads/master
Commit: 1673e726bb5dc64d316d699f2f63fa00ac321819
Parents: 9923b82
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue Oct 31 17:30:26 2017 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 8 02:16:26 2017 +0000
----------------------------------------------------------------------
be/src/catalog/catalog-server.cc | 6 ++---
be/src/catalog/catalog-server.h | 3 ++-
be/src/exec/plan-root-sink.cc | 16 ++++++------
be/src/exec/plan-root-sink.h | 7 +++--
be/src/rpc/thrift-server.cc | 12 ++++-----
be/src/runtime/coordinator.cc | 6 ++---
be/src/runtime/coordinator.h | 4 +--
be/src/runtime/data-stream-mgr.h | 1 -
be/src/runtime/data-stream-recvr.cc | 25 +++++++++---------
be/src/runtime/data-stream-sender.cc | 8 +++---
be/src/runtime/disk-io-mgr-internal.h | 11 ++++----
be/src/runtime/disk-io-mgr-reader-context.cc | 2 +-
be/src/runtime/disk-io-mgr-scan-range.cc | 6 ++---
be/src/runtime/disk-io-mgr-stress.cc | 2 ++
be/src/runtime/disk-io-mgr-stress.h | 3 ---
be/src/runtime/disk-io-mgr-test.cc | 17 ++++++-------
be/src/runtime/disk-io-mgr.cc | 12 ++++-----
be/src/runtime/disk-io-mgr.h | 4 +--
be/src/runtime/fragment-instance-state.cc | 16 +++++-------
be/src/runtime/fragment-instance-state.h | 5 ++--
be/src/scheduling/admission-controller.cc | 8 +++---
be/src/scheduling/admission-controller.h | 3 ++-
be/src/service/client-request-state.cc | 4 +--
be/src/service/client-request-state.h | 3 ++-
be/src/service/impala-server.cc | 16 +++++-------
be/src/service/impala-server.h | 5 ++--
be/src/statestore/statestore.h | 1 -
be/src/util/blocking-queue.h | 4 +--
be/src/util/condition-variable.h | 31 ++++++++++++++++++-----
be/src/util/promise.h | 11 ++++----
be/src/util/thread-pool.h | 7 ++---
31 files changed, 137 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b4745fe..b004b22 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -194,7 +194,7 @@ Status CatalogServer::Start() {
// Notify the thread to start for the first time.
{
lock_guard<mutex> l(catalog_lock_);
- catalog_update_cv_.notify_one();
+ catalog_update_cv_.NotifyOne();
}
return Status::OK();
}
@@ -253,7 +253,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
// Signal the catalog update gathering thread to start.
topic_updates_ready_ = false;
- catalog_update_cv_.notify_one();
+ catalog_update_cv_.NotifyOne();
}
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
@@ -264,7 +264,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
// when topic_updates_ready_ is false, otherwise we may be in the middle of
// processing a heartbeat.
while (topic_updates_ready_) {
- catalog_update_cv_.wait(unique_lock);
+ catalog_update_cv_.Wait(unique_lock);
}
MonotonicStopWatch sw;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 452a9b9..78a3f20 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -29,6 +29,7 @@
#include "gen-cpp/Types_types.h"
#include "catalog/catalog.h"
#include "statestore/statestore-subscriber.h"
+#include "util/condition-variable.h"
#include "util/metrics.h"
#include "rapidjson/rapidjson.h"
@@ -95,7 +96,7 @@ class CatalogServer {
/// fetch its next set of updates from the JniCatalog. At the end of each statestore
/// heartbeat, this CV is signaled and the catalog_update_gathering_thread_ starts
/// querying the JniCatalog for catalog objects. Protected by the catalog_lock_.
- boost::condition_variable catalog_update_cv_;
+ ConditionVariable catalog_update_cv_;
/// The latest available set of catalog topic updates (additions/modifications, and
/// deletions). Set by the catalog_update_gathering_thread_ and protected by
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 9c20ff3..c4ad604 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -73,7 +73,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
// written clients may not cope correctly with them. See IMPALA-4335.
while (current_batch_row < batch->num_rows()) {
unique_lock<mutex> l(lock_);
- while (results_ == nullptr && !consumer_done_) sender_cv_.wait(l);
+ while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l);
if (consumer_done_ || batch == nullptr) {
eos_ = true;
return Status::OK();
@@ -101,7 +101,7 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
expr_results_pool_->Clear();
// Signal the consumer.
results_ = nullptr;
- consumer_cv_.notify_all();
+ consumer_cv_.NotifyAll();
}
return Status::OK();
}
@@ -110,7 +110,7 @@ Status PlanRootSink::FlushFinal(RuntimeState* state) {
unique_lock<mutex> l(lock_);
sender_done_ = true;
eos_ = true;
- consumer_cv_.notify_all();
+ consumer_cv_.NotifyAll();
return Status::OK();
}
@@ -119,17 +119,17 @@ void PlanRootSink::Close(RuntimeState* state) {
// No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
// well.
sender_done_ = true;
- consumer_cv_.notify_all();
+ consumer_cv_.NotifyAll();
// Wait for consumer to be done, in case sender tries to tear-down this sink while the
// sender is still reading from it.
- while (!consumer_done_) sender_cv_.wait(l);
+ while (!consumer_done_) sender_cv_.Wait(l);
DataSink::Close(state);
}
void PlanRootSink::CloseConsumer() {
unique_lock<mutex> l(lock_);
consumer_done_ = true;
- sender_cv_.notify_all();
+ sender_cv_.NotifyAll();
}
Status PlanRootSink::GetNext(
@@ -138,9 +138,9 @@ Status PlanRootSink::GetNext(
results_ = results;
num_rows_requested_ = num_results;
- sender_cv_.notify_all();
+ sender_cv_.NotifyAll();
- while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l);
+ while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l);
*eos = eos_;
return state->GetQueryStatus();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 654bd27..f367d04 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -19,8 +19,7 @@
#define IMPALA_EXEC_PLAN_ROOT_SINK_H
#include "exec/data-sink.h"
-
-#include <boost/thread/condition_variable.hpp>
+#include "util/condition-variable.h"
namespace impala {
@@ -95,12 +94,12 @@ class PlanRootSink : public DataSink {
/// num_rows_requested_, and so the sender may begin satisfying that request for rows
/// from its current batch. Also signalled when CloseConsumer() is called, to unblock
/// the sender.
- boost::condition_variable sender_cv_;
+ ConditionVariable sender_cv_;
/// Waited on by the consumer only. Signalled when the sender has finished serving a
/// request for rows. Also signalled by Close() and FlushFinal() to signal to the
/// consumer that no more rows are coming.
- boost::condition_variable consumer_cv_;
+ ConditionVariable consumer_cv_;
/// Signals to producer that the consumer is done, and the sink may be torn down.
bool consumer_done_ = false;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 5bf47b2..ab51315 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -18,7 +18,6 @@
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <thrift/concurrency/Thread.h>
@@ -37,6 +36,7 @@
#include "rpc/authentication.h"
#include "rpc/thrift-server.h"
#include "rpc/thrift-thread.h"
+#include "util/condition-variable.h"
#include "util/debug-util.h"
#include "util/metrics.h"
#include "util/network-util.h"
@@ -139,13 +139,13 @@ class ThriftServer::ThriftServerEventProcessor : public TServerEventHandler {
private:
// Lock used to ensure that there are no missed notifications between starting the
- // supervision thread and calling signal_cond_.timed_wait. Also used to ensure
+ // supervision thread and calling signal_cond_.WaitUntil. Also used to ensure
// thread-safe access to members of thrift_server_
boost::mutex signal_lock_;
// Condition variable that is notified by the supervision thread once either
// a) all is well or b) an error occurred.
- boost::condition_variable signal_cond_;
+ ConditionVariable signal_cond_;
// The ThriftServer under management. This class is a friend of ThriftServer, and
// reaches in to change member variables at will.
@@ -179,7 +179,7 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
// visibility.
while (!signal_fired_) {
// Yields lock and allows supervision thread to continue and signal
- if (!signal_cond_.timed_wait(lock, deadline)) {
+ if (!signal_cond_.WaitUntil(lock, deadline)) {
stringstream ss;
ss << "ThriftServer '" << thrift_server_->name_ << "' (on port: "
<< thrift_server_->port_ << ") did not start within "
@@ -220,7 +220,7 @@ void ThriftServer::ThriftServerEventProcessor::Supervise() {
// failure, for example.
signal_fired_ = true;
}
- signal_cond_.notify_all();
+ signal_cond_.NotifyAll();
}
void ThriftServer::ThriftServerEventProcessor::preServe() {
@@ -234,7 +234,7 @@ void ThriftServer::ThriftServerEventProcessor::preServe() {
thrift_server_->started_ = true;
// Should only be one thread waiting on signal_cond_, but wake all just in case.
- signal_cond_.notify_all();
+ signal_cond_.NotifyAll();
}
// This thread-local variable contains the current connection context for whichever
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7fe6fb7..ae0c9ad 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -796,7 +796,7 @@ Status Coordinator::WaitForBackendCompletion() {
while (num_remaining_backends_ > 0 && query_status_.ok()) {
VLOG_QUERY << "Coordinator waiting for backends to finish, "
<< num_remaining_backends_ << " remaining";
- backend_completion_cv_.wait(l);
+ backend_completion_cv_.Wait(l);
}
if (query_status_.ok()) {
VLOG_QUERY << "All backends finished successfully.";
@@ -913,7 +913,7 @@ void Coordinator::CancelInternal() {
VLOG_QUERY << Substitute(
"CancelBackends() query_id=$0, tried to cancel $1 backends",
PrintId(query_id()), num_cancelled);
- backend_completion_cv_.notify_all();
+ backend_completion_cv_.NotifyAll();
ReleaseExecResourcesLocked();
ReleaseAdmissionControlResourcesLocked();
@@ -966,7 +966,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
BackendState::LogFirstInProgress(backend_states_);
}
if (--num_remaining_backends_ == 0 || !status.ok()) {
- backend_completion_cv_.notify_all();
+ backend_completion_cv_.NotifyAll();
}
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 3549ae9..e7ddee9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -28,7 +28,6 @@
#include <boost/accumulators/statistics/stats.hpp>
#include <boost/accumulators/statistics/variance.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
@@ -41,6 +40,7 @@
#include "gen-cpp/Types_types.h"
#include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle
#include "scheduling/query-schedule.h"
+#include "util/condition-variable.h"
#include "util/progress-updater.h"
namespace impala {
@@ -298,7 +298,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// If there is no coordinator fragment, Wait() simply waits until all
/// backends report completion by notifying on backend_completion_cv_.
/// Tied to lock_.
- boost::condition_variable backend_completion_cv_;
+ ConditionVariable backend_completion_cv_;
/// Count of the number of backends for which done != true. When this
/// hits 0, any Wait()'ing thread is notified
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index eff468e..1e9f4b6 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -22,7 +22,6 @@
#include <list>
#include <set>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index d8150eb..cdea4a0 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -23,13 +23,12 @@
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/sorted-run-merger.h"
+#include "util/condition-variable.h"
#include "util/runtime-profile-counters.h"
#include "util/periodic-counter-updater.h"
#include "common/names.h"
-using boost::condition_variable;
-
namespace impala {
// Implements a blocking queue of row batches from one or more senders. One queue
@@ -83,10 +82,10 @@ class DataStreamRecvr::SenderQueue {
int num_remaining_senders_;
// signal arrival of new batch or the eos/cancelled condition
- condition_variable data_arrival_cv_;
+ ConditionVariable data_arrival_cv_;
// signal removal of data by stream consumer
- condition_variable data_removal__cv_;
+ ConditionVariable data_removal__cv_;
// queue of (batch length, batch) pairs. The SenderQueue block owns memory to
// these batches. They are handed off to the caller via GetBatch.
@@ -120,7 +119,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
CANCEL_SAFE_SCOPED_TIMER(
received_first_batch_ ? NULL : recvr_->first_batch_wait_total_timer_,
&is_cancelled_);
- data_arrival_cv_.wait(l);
+ data_arrival_cv_.Wait(l);
}
// cur_batch_ must be replaced with the returned batch.
@@ -140,7 +139,7 @@ Status DataStreamRecvr::SenderQueue::GetBatch(RowBatch** next_batch) {
recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first);
VLOG_ROW << "fetched #rows=" << result->num_rows();
batch_queue_.pop_front();
- data_removal__cv_.notify_one();
+ data_removal__cv_.NotifyOne();
current_batch_.reset(result);
*next_batch = current_batch_.get();
return Status::OK();
@@ -175,10 +174,10 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
try_mutex::scoped_try_lock timer_lock(recvr_->buffer_wall_timer_lock_);
if (timer_lock) {
CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_wall_timer_, &is_cancelled_);
- data_removal__cv_.wait(l);
+ data_removal__cv_.Wait(l);
got_timer_lock = true;
} else {
- data_removal__cv_.wait(l);
+ data_removal__cv_.Wait(l);
got_timer_lock = false;
}
}
@@ -197,7 +196,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
// time it takes this thread to finish (and yield lock_) and the
// notified thread to be woken up and to acquire the try_lock. In
// practice, this time is small relative to the total wait time.
- if (got_timer_lock) data_removal__cv_.notify_one();
+ if (got_timer_lock) data_removal__cv_.NotifyOne();
}
if (!is_cancelled_) {
@@ -213,7 +212,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const TRowBatch& thrift_batch) {
<< " batch_size=" << batch_size << "\n";
batch_queue_.push_back(make_pair(batch_size, batch));
recvr_->num_buffered_bytes_.Add(batch_size);
- data_arrival_cv_.notify_one();
+ data_arrival_cv_.NotifyOne();
}
}
@@ -225,7 +224,7 @@ void DataStreamRecvr::SenderQueue::DecrementSenders() {
<< recvr_->fragment_instance_id()
<< " node_id=" << recvr_->dest_node_id()
<< " #senders=" << num_remaining_senders_;
- if (num_remaining_senders_ == 0) data_arrival_cv_.notify_one();
+ if (num_remaining_senders_ == 0) data_arrival_cv_.NotifyOne();
}
void DataStreamRecvr::SenderQueue::Cancel() {
@@ -239,8 +238,8 @@ void DataStreamRecvr::SenderQueue::Cancel() {
}
// Wake up all threads waiting to produce/consume batches. They will all
// notice that the stream is cancelled and handle it.
- data_arrival_cv_.notify_all();
- data_removal__cv_.notify_all();
+ data_arrival_cv_.NotifyAll();
+ data_removal__cv_.NotifyAll();
}
void DataStreamRecvr::SenderQueue::Close() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index dc26a23..ed156dc 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -33,6 +33,7 @@
#include "runtime/mem-tracker.h"
#include "runtime/backend-client.h"
#include "util/aligned-new.h"
+#include "util/condition-variable.h"
#include "util/debug-util.h"
#include "util/network-util.h"
#include "util/thread-pool.h"
@@ -45,7 +46,6 @@
#include "common/names.h"
-using boost::condition_variable;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
@@ -133,7 +133,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
// TODO: if the order of row batches does not matter, we can consider increasing
// the number of threads.
ThreadPool<TRowBatch*> rpc_thread_; // sender thread.
- condition_variable rpc_done_cv_; // signaled when rpc_in_flight_ is set to true.
+ ConditionVariable rpc_done_cv_; // signaled when rpc_in_flight_ is set to true.
mutex rpc_thread_lock_; // Lock with rpc_done_cv_ protecting rpc_in_flight_
bool rpc_in_flight_; // true if the rpc_thread_ is busy sending.
@@ -188,7 +188,7 @@ void DataStreamSender::Channel::TransmitData(int thread_id, const TRowBatch* bat
unique_lock<mutex> l(rpc_thread_lock_);
rpc_in_flight_ = false;
}
- rpc_done_cv_.notify_one();
+ rpc_done_cv_.NotifyOne();
}
void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
@@ -239,7 +239,7 @@ void DataStreamSender::Channel::WaitForRpc() {
SCOPED_TIMER(parent_->state_->total_network_send_timer());
unique_lock<mutex> l(rpc_thread_lock_);
while (rpc_in_flight_) {
- rpc_done_cv_.wait(l);
+ rpc_done_cv_.Wait(l);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index a9acca4..138f3f0 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -28,6 +28,7 @@
#include "runtime/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
+#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
@@ -51,7 +52,7 @@ struct DiskIoMgr::DiskQueue {
/// thread should shut down. A disk thread will be woken up when there is a reader
/// added to the queue. A reader is only on the queue when it has at least one
/// scan range that is not blocked on available buffers.
- boost::condition_variable work_available;
+ ConditionVariable work_available;
/// list of all request contexts that have work queued on this disk
std::list<DiskIoRequestContext*> request_contexts;
@@ -65,7 +66,7 @@ struct DiskIoMgr::DiskQueue {
request_contexts.end());
request_contexts.push_back(worker);
}
- work_available.notify_all();
+ work_available.NotifyAll();
}
DiskQueue(int id) : disk_id(id) { }
@@ -157,7 +158,7 @@ class DiskIoRequestContext {
// boost doesn't let us dcheck that the reader lock is taken
DCHECK_GT(num_disks_with_ranges_, 0);
if (--num_disks_with_ranges_ == 0) {
- disks_complete_cond_var_.notify_all();
+ disks_complete_cond_var_.NotifyAll();
}
DCHECK(Validate()) << std::endl << DebugString();
}
@@ -289,13 +290,13 @@ class DiskIoRequestContext {
/// We currently populate one range per disk.
/// TODO: think about this some more.
InternalQueue<ScanRange> ready_to_start_ranges_;
- boost::condition_variable ready_to_start_ranges_cv_; // used with lock_
+ ConditionVariable ready_to_start_ranges_cv_; // used with lock_
/// Ranges that are blocked due to back pressure on outgoing buffers.
InternalQueue<ScanRange> blocked_ranges_;
/// Condition variable for UnregisterContext() to wait for all disks to complete
- boost::condition_variable disks_complete_cond_var_;
+ ConditionVariable disks_complete_cond_var_;
/// Struct containing state per disk. See comments in the disk read loop on how
/// they are used.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index 77f332a..afe2b23 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -88,7 +88,7 @@ void DiskIoRequestContext::Cancel(const Status& status) {
// Signal reader and unblock the GetNext/Read thread. That read will fail with
// a cancelled status.
- ready_to_start_ranges_cv_.notify_all();
+ ready_to_start_ranges_cv_.NotifyAll();
}
void DiskIoRequestContext::AddRequestRange(
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index c5c9514..7f0692e 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -67,7 +67,7 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(
blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
}
- buffer_ready_cv_.notify_one();
+ buffer_ready_cv_.NotifyOne();
return blocked_on_queue_;
}
@@ -81,7 +81,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
DCHECK(Validate()) << DebugString();
while (ready_buffers_.empty() && !is_cancelled_) {
- buffer_ready_cv_.wait(scan_range_lock);
+ buffer_ready_cv_.Wait(scan_range_lock);
}
if (is_cancelled_) {
@@ -153,7 +153,7 @@ void DiskIoMgr::ScanRange::Cancel(const Status& status) {
is_cancelled_ = true;
status_ = status;
}
- buffer_ready_cv_.notify_all();
+ buffer_ready_cv_.NotifyAll();
CleanupQueuedBuffers();
// For cached buffers, we can't close the range until the cached buffer is returned.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index 658b747..3959194 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include <boost/thread/mutex.hpp>
+
#include "runtime/disk-io-mgr-stress.h"
#include "util/time.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h
index 6d7549e..0a66f2c 100644
--- a/be/src/runtime/disk-io-mgr-stress.h
+++ b/be/src/runtime/disk-io-mgr-stress.h
@@ -22,10 +22,7 @@
#include <memory>
#include <vector>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/unordered_map.hpp>
#include "runtime/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index a6a719f..8d36ea6 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -27,6 +27,7 @@
#include "runtime/disk-io-mgr-stress.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
+#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/disk-info.h"
#include "util/thread.h"
@@ -37,8 +38,6 @@ DECLARE_int32(num_remote_hdfs_io_threads);
DECLARE_int32(num_s3_io_threads);
DECLARE_int32(num_adls_io_threads);
-using boost::condition_variable;
-
const int MIN_BUFFER_SIZE = 512;
const int MAX_BUFFER_SIZE = 1024;
const int LARGE_MEM_LIMIT = 1024 * 1024 * 1024;
@@ -72,7 +71,7 @@ class DiskIoMgrTest : public testing::Test {
{
lock_guard<mutex> l(written_mutex_);
++num_ranges_written_;
- if (num_ranges_written_ == num_writes) writes_done_.notify_one();
+ if (num_ranges_written_ == num_writes) writes_done_.NotifyOne();
}
}
@@ -81,7 +80,7 @@ class DiskIoMgrTest : public testing::Test {
{
lock_guard<mutex> l(written_mutex_);
++num_ranges_written_;
- if (num_ranges_written_ == num_writes) writes_done_.notify_all();
+ if (num_ranges_written_ == num_writes) writes_done_.NotifyAll();
}
}
@@ -179,7 +178,7 @@ class DiskIoMgrTest : public testing::Test {
ObjectPool pool_;
mutex written_mutex_;
- condition_variable writes_done_;
+ ConditionVariable writes_done_;
int num_ranges_written_;
};
@@ -229,7 +228,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
{
unique_lock<mutex> lock(written_mutex_);
- while (num_ranges_written_ < num_ranges) writes_done_.wait(lock);
+ while (num_ranges_written_ < num_ranges) writes_done_.Wait(lock);
}
num_ranges_written_ = 0;
io_mgr.UnregisterContext(writer);
@@ -283,7 +282,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
{
unique_lock<mutex> lock(written_mutex_);
- while (num_ranges_written_ < 2) writes_done_.wait(lock);
+ while (num_ranges_written_ < 2) writes_done_.Wait(lock);
}
num_ranges_written_ = 0;
io_mgr.UnregisterContext(writer);
@@ -342,7 +341,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
{
unique_lock<mutex> lock(written_mutex_);
- while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.wait(lock);
+ while (num_ranges_written_ < num_ranges_before_cancel) writes_done_.Wait(lock);
}
num_ranges_written_ = 0;
io_mgr.UnregisterContext(writer);
@@ -810,7 +809,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
{
unique_lock<mutex> lock(written_mutex_);
- while (num_ranges_written_ < num_write_ranges) writes_done_.wait(lock);
+ while (num_ranges_written_ < num_write_ranges) writes_done_.Wait(lock);
}
threads.join_all();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 54dfc98..8af70f5 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -343,7 +343,7 @@ DiskIoMgr::~DiskIoMgr() {
// to shut_down_ are protected.
unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
}
- disk_queues_[i]->work_available.notify_all();
+ disk_queues_[i]->work_available.NotifyAll();
}
disk_thread_group_.JoinAll();
@@ -469,7 +469,7 @@ void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks
unique_lock<mutex> lock(context->lock_);
DCHECK(context->Validate()) << endl << context->DebugString();
while (context->num_disks_with_ranges_ > 0) {
- context->disks_complete_cond_var_.wait(lock);
+ context->disks_complete_cond_var_.Wait(lock);
}
}
}
@@ -639,7 +639,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
}
if (reader->ready_to_start_ranges_.empty()) {
- reader->ready_to_start_ranges_cv_.wait(reader_lock);
+ reader->ready_to_start_ranges_cv_.Wait(reader_lock);
} else {
*range = reader->ready_to_start_ranges_.Dequeue();
DCHECK(*range != nullptr);
@@ -846,7 +846,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
while (!shut_down_ && disk_queue->request_contexts.empty()) {
// wait if there are no readers on the queue
- disk_queue->work_available.wait(disk_lock);
+ disk_queue->work_available.Wait(disk_lock);
}
if (shut_down_) break;
DCHECK(!disk_queue->request_contexts.empty());
@@ -909,9 +909,9 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
// All the ranges have been started, notify everyone blocked on GetNextRange.
// Only one of them will get work so make sure to return nullptr to the other
// caller threads.
- (*request_context)->ready_to_start_ranges_cv_.notify_all();
+ (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
} else {
- (*request_context)->ready_to_start_ranges_cv_.notify_one();
+ (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index ed33942..e7a7122 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -25,7 +25,6 @@
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_set.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
#include "common/atomic.h"
#include "common/hdfs.h"
@@ -35,6 +34,7 @@
#include "runtime/thread-resource-mgr.h"
#include "util/aligned-new.h"
#include "util/bit-util.h"
+#include "util/condition-variable.h"
#include "util/error-util.h"
#include "util/internal-queue.h"
#include "util/runtime-profile.h"
@@ -563,7 +563,7 @@ class DiskIoMgr : public CacheLineAligned {
/// IO buffers that are queued for this scan range.
/// Condition variable for GetNext
- boost::condition_variable buffer_ready_cv_;
+ ConditionVariable buffer_ready_cv_;
std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
/// Lock that should be taken during hdfs calls. Only one thread (the disk reading
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index a7d3c86..f957dd1 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -230,7 +230,7 @@ Status FragmentInstanceState::Prepare() {
thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
// Make sure the thread started up, otherwise ReportProfileThread() might get into
// a race with StopReportThread().
- while (!report_thread_active_) report_thread_started_cv_.wait(l);
+ while (!report_thread_active_) report_thread_started_cv_.Wait(l);
}
return Status::OK();
@@ -322,28 +322,26 @@ void FragmentInstanceState::ReportProfileThread() {
unique_lock<mutex> l(report_thread_lock_);
// tell Prepare() that we started
report_thread_active_ = true;
- report_thread_started_cv_.notify_one();
+ report_thread_started_cv_.NotifyOne();
// Jitter the reporting time of remote fragments by a random amount between
// 0 and the report_interval. This way, the coordinator doesn't get all the
// updates at once so its better for contention as well as smoother progress
// reporting.
int report_fragment_offset = rand() % FLAGS_status_report_interval;
- boost::system_time timeout = boost::get_system_time()
- + boost::posix_time::seconds(report_fragment_offset);
+ boost::posix_time::seconds wait_duration(report_fragment_offset);
// We don't want to wait longer than it takes to run the entire fragment.
- stop_report_thread_cv_.timed_wait(l, timeout);
+ stop_report_thread_cv_.WaitFor(l, wait_duration);
while (report_thread_active_) {
- boost::system_time timeout = boost::get_system_time()
- + boost::posix_time::seconds(FLAGS_status_report_interval);
+ boost::posix_time::seconds loop_wait_duration(FLAGS_status_report_interval);
// timed_wait can return because the timeout occurred or the condition variable
// was signaled. We can't rely on its return value to distinguish between the
// two cases (e.g. there is a race here where the wait timed out but before grabbing
// the lock, the condition variable was signaled). Instead, we will use an external
// flag, report_thread_active_, to coordinate this.
- stop_report_thread_cv_.timed_wait(l, timeout);
+ stop_report_thread_cv_.WaitFor(l, loop_wait_duration);
if (!report_thread_active_) break;
SendReport(false, Status::OK());
@@ -378,7 +376,7 @@ void FragmentInstanceState::StopReportThread() {
lock_guard<mutex> l(report_thread_lock_);
report_thread_active_ = false;
}
- stop_report_thread_cv_.notify_one();
+ stop_report_thread_cv_.NotifyOne();
report_thread_->Join();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index fa35c6b..f540b56 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -28,6 +28,7 @@
#include "gen-cpp/ImpalaInternalService_types.h"
#include "runtime/row-batch.h"
+#include "util/condition-variable.h"
#include "util/promise.h"
#include "util/runtime-profile.h"
@@ -138,11 +139,11 @@ class FragmentInstanceState {
/// Indicates that profile reporting thread should stop.
/// Tied to report_thread_lock_.
- boost::condition_variable stop_report_thread_cv_;
+ ConditionVariable stop_report_thread_cv_;
/// Indicates that profile reporting thread started.
/// Tied to report_thread_lock_.
- boost::condition_variable report_thread_started_cv_;
+ ConditionVariable report_thread_started_cv_;
/// When the report thread starts, it sets report_thread_active_ to true and signals
/// report_thread_started_cv_. The report thread is shut down by setting
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 8994a5b..99f659a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -233,7 +233,7 @@ AdmissionController::~AdmissionController() {
// Lock to ensure the dequeue thread will see the update to done_
lock_guard<mutex> l(admission_ctrl_lock_);
done_ = true;
- dequeue_cv_.notify_one();
+ dequeue_cv_.NotifyOne();
}
dequeue_thread_->Join();
}
@@ -620,7 +620,7 @@ void AdmissionController::ReleaseQuery(const QuerySchedule& schedule) {
VLOG_RPC << "Released query id=" << schedule.query_id() << " "
<< stats->DebugString();
}
- dequeue_cv_.notify_one();
+ dequeue_cv_.NotifyOne();
}
// Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.
@@ -648,7 +648,7 @@ void AdmissionController::UpdatePoolStats(
}
UpdateClusterAggregates();
}
- dequeue_cv_.notify_one(); // Dequeue and admit queries on the dequeue thread
+ dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread
}
void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
@@ -821,7 +821,7 @@ void AdmissionController::DequeueLoop() {
while (true) {
unique_lock<mutex> lock(admission_ctrl_lock_);
if (done_) break;
- dequeue_cv_.wait(lock);
+ dequeue_cv_.Wait(lock);
for (const PoolConfigMap::value_type& entry: pool_config_map_) {
const string& pool_name = entry.first;
const TPoolConfig& pool_config = entry.second;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 0cb9f2a..2830bee 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -31,6 +31,7 @@
#include "scheduling/request-pool-service.h"
#include "scheduling/query-schedule.h"
#include "statestore/statestore-subscriber.h"
+#include "util/condition-variable.h"
#include "util/internal-queue.h"
#include "util/thread.h"
@@ -409,7 +410,7 @@ class AdmissionController {
/// Notifies the dequeuing thread that pool stats have changed and it may be
/// possible to dequeue and admit queries.
- boost::condition_variable dequeue_cv_;
+ ConditionVariable dequeue_cv_;
/// If true, tear down the dequeuing thread. This only happens in unit tests.
bool done_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 523e4ae..5d00d24 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -602,12 +602,12 @@ void ClientRequestState::BlockOnWait() {
l.lock();
is_block_on_wait_joining_ = false;
wait_thread_.reset();
- block_on_wait_cv_.notify_all();
+ block_on_wait_cv_.NotifyAll();
} else {
// Another thread is already joining with wait_thread_. Block on the cond-var
// until the Join() executed in the other thread has completed.
do {
- block_on_wait_cv_.wait(l);
+ block_on_wait_cv_.Wait(l);
} while (is_block_on_wait_joining_);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index a5c03f5..968ae04 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -25,6 +25,7 @@
#include "service/child-query.h"
#include "service/impala-server.h"
#include "util/auth-util.h"
+#include "util/condition-variable.h"
#include "util/runtime-profile.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Frontend_types.h"
@@ -254,7 +255,7 @@ class ClientRequestState {
/// Condition variable to make BlockOnWait() thread-safe. One thread joins
/// wait_thread_, and all other threads block on this cv. Used with lock_.
- boost::condition_variable block_on_wait_cv_;
+ ConditionVariable block_on_wait_cv_;
/// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
bool is_block_on_wait_joining_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4b225b4..07a83eb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1123,7 +1123,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
multiset<int32_t>::const_iterator itr = session_timeout_set_.find(session_timeout);
DCHECK(itr != session_timeout_set_.end());
session_timeout_set_.erase(itr);
- session_timeout_cv_.notify_one();
+ session_timeout_cv_.NotifyOne();
}
return Status::OK();
}
@@ -1448,7 +1448,7 @@ void ImpalaServer::CatalogUpdateCallback(
unique_lock<mutex> unique_lock(catalog_version_lock_);
min_subscriber_catalog_topic_version_ = delta.min_subscriber_topic_version;
}
- catalog_version_update_cv_.notify_all();
+ catalog_version_update_cv_.NotifyAll();
}
Status ImpalaServer::ProcessCatalogUpdateResult(
@@ -1487,7 +1487,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
<< " current version: " << catalog_update_info_.catalog_version;
while (catalog_update_info_.catalog_version < min_req_catalog_version &&
catalog_update_info_.catalog_service_id == catalog_service_id) {
- catalog_version_update_cv_.wait(unique_lock);
+ catalog_version_update_cv_.Wait(unique_lock);
}
if (!wait_for_all_subscribers) return Status::OK();
@@ -1502,7 +1502,7 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
<< min_subscriber_catalog_topic_version_;
while (min_subscriber_catalog_topic_version_ < min_req_subscriber_topic_version &&
catalog_update_info_.catalog_service_id == catalog_service_id) {
- catalog_version_update_cv_.wait(unique_lock);
+ catalog_version_update_cv_.Wait(unique_lock);
}
return Status::OK();
}
@@ -1786,7 +1786,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
if (session_timeout <= 0) return;
lock_guard<mutex> l(session_timeout_lock_);
session_timeout_set_.insert(session_timeout);
- session_timeout_cv_.notify_one();
+ session_timeout_cv_.NotifyOne();
}
[[noreturn]] void ImpalaServer::ExpireSessions() {
@@ -1794,12 +1794,10 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
{
unique_lock<mutex> timeout_lock(session_timeout_lock_);
if (session_timeout_set_.empty()) {
- session_timeout_cv_.wait(timeout_lock);
+ session_timeout_cv_.Wait(timeout_lock);
} else {
// Sleep for a second before checking whether an active session can be expired.
- const int64_t SLEEP_TIME_MS = 1000;
- system_time deadline = get_system_time() + milliseconds(SLEEP_TIME_MS);
- session_timeout_cv_.timed_wait(timeout_lock, deadline);
+ session_timeout_cv_.WaitFor(timeout_lock, seconds(1));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f0dfa06..81ec929 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -35,6 +35,7 @@
#include "common/status.h"
#include "service/frontend.h"
#include "service/query-options.h"
+#include "util/condition-variable.h"
#include "util/metrics.h"
#include "util/runtime-profile.h"
#include "util/simple-logger.h"
@@ -800,7 +801,7 @@ class ImpalaServer : public ImpalaServiceIf,
/// session_timeout_thread_ relies on the following conditional variable to wake up
/// on every poll period expiration or when the poll period changes.
- boost::condition_variable session_timeout_cv_;
+ ConditionVariable session_timeout_cv_;
/// map from query id to exec state; ClientRequestState is owned by us and referenced
/// as a shared_ptr to allow asynchronous deletion
@@ -922,7 +923,7 @@ class ImpalaServer : public ImpalaServiceIf,
boost::mutex catalog_version_lock_;
/// Variable to signal when the catalog version has been modified
- boost::condition_variable catalog_version_update_cv_;
+ ConditionVariable catalog_version_update_cv_;
/// Contains details on the version information of a catalog update.
struct CatalogUpdateVersionInfo {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 26aa836..86a0d9e 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -24,7 +24,6 @@
#include <vector>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/unordered_map.hpp>
#include <boost/uuid/uuid_generators.hpp>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a32345c..a4b1b8f 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -19,7 +19,6 @@
#ifndef IMPALA_UTIL_BLOCKING_QUEUE_H
#define IMPALA_UTIL_BLOCKING_QUEUE_H
-#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <deque>
@@ -141,12 +140,11 @@ class BlockingQueue : public CacheLineAligned {
boost::unique_lock<boost::mutex> write_lock(put_lock_);
boost::system_time wtime = boost::get_system_time() +
boost::posix_time::microseconds(timeout_micros);
- const struct timespec timeout = boost::detail::to_timespec(wtime);
bool notified = true;
while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
timer.Start();
// Wait until we're notified or until the timeout expires.
- notified = put_cv_.TimedWait(write_lock, &timeout);
+ notified = put_cv_.WaitUntil(write_lock, wtime);
timer.Stop();
}
total_put_wait_time_ += timer.ElapsedTime();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
index b0b1090..c1a1e56 100644
--- a/be/src/util/condition-variable.h
+++ b/be/src/util/condition-variable.h
@@ -20,6 +20,7 @@
#include <boost/thread/pthread/timespec.hpp>
#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread_time.hpp>
#include <pthread.h>
#include <unistd.h>
@@ -34,7 +35,7 @@ class ConditionVariable {
~ConditionVariable() { pthread_cond_destroy(&cv_); }
/// Wait indefinitely on the condition variable until it's notified.
- inline void Wait(boost::unique_lock<boost::mutex>& lock) {
+ void Wait(boost::unique_lock<boost::mutex>& lock) {
DCHECK(lock.owns_lock());
pthread_mutex_t* mutex = lock.mutex()->native_handle();
pthread_cond_wait(&cv_, mutex);
@@ -43,18 +44,36 @@ class ConditionVariable {
/// Wait until the condition variable is notified or 'timeout' has passed.
/// Returns true if the condition variable is notified before the absolute timeout
/// specified in 'timeout' has passed. Returns false otherwise.
- inline bool TimedWait(boost::unique_lock<boost::mutex>& lock,
- const struct timespec* timeout) {
+ bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
+ const timespec& abs_time) {
DCHECK(lock.owns_lock());
pthread_mutex_t* mutex = lock.mutex()->native_handle();
- return pthread_cond_timedwait(&cv_, mutex, timeout) == 0;
+ return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0;
+ }
+
+ /// Wait until the condition variable is notified or 'abs_time' has passed.
+ /// Returns true if the condition variable is notified before the absolute timeout
+ /// specified in 'abs_time' has passed. Returns false otherwise.
+ bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
+ const boost::system_time& abs_time) {
+ return WaitUntil(lock, to_timespec(abs_time));
+ }
+
+ /// Wait until the condition variable is notified or have waited for the time
+ /// specified in 'wait_duration'.
+ /// Returns true if the condition variable is notified in time.
+ /// Returns false otherwise.
+ template <typename duration_type>
+ bool WaitFor(boost::unique_lock<boost::mutex>& lock,
+ const duration_type& wait_duration) {
+ return WaitUntil(lock, to_timespec(boost::get_system_time() + wait_duration));
}
/// Notify a single waiter on this condition variable.
- inline void NotifyOne() { pthread_cond_signal(&cv_); }
+ void NotifyOne() { pthread_cond_signal(&cv_); }
/// Notify all waiters on this condition variable.
- inline void NotifyAll() { pthread_cond_broadcast(&cv_); }
+ void NotifyAll() { pthread_cond_broadcast(&cv_); }
private:
pthread_cond_t cv_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/util/promise.h b/be/src/util/promise.h
index c6b8f15..5de2d13 100644
--- a/be/src/util/promise.h
+++ b/be/src/util/promise.h
@@ -21,6 +21,7 @@
#include <algorithm>
#include <boost/thread.hpp>
+#include "util/condition-variable.h"
#include "util/time.h"
#include "common/atomic.h"
#include "common/logging.h"
@@ -53,9 +54,9 @@ class Promise {
/// p.get();
/// }
/// < promise object gets destroyed >
- /// Calling notify_all() with the val_lock_ guarantees that the thread calling
+ /// Calling NotifyAll() with the val_lock_ guarantees that the thread calling
/// Set() is done and the promise is safe to delete.
- val_set_cond_.notify_all();
+ val_set_cond_.NotifyAll();
}
/// Blocks until a value is set, and then returns a reference to that value. Once Get()
@@ -63,7 +64,7 @@ class Promise {
const T& Get() {
boost::unique_lock<boost::mutex> l(val_lock_);
while (!val_is_set_) {
- val_set_cond_.wait(l);
+ val_set_cond_.Wait(l);
}
return val_;
}
@@ -86,7 +87,7 @@ class Promise {
boost::posix_time::microseconds wait_time =
boost::posix_time::microseconds(std::max<int64_t>(
1, timeout_micros - (now - start)));
- val_set_cond_.timed_wait(l, wait_time);
+ val_set_cond_.WaitFor(l, wait_time);
now = MonotonicMicros();
}
*timed_out = !val_is_set_;
@@ -102,7 +103,7 @@ class Promise {
private:
/// These variables deal with coordination between consumer and producer, and protect
/// access to val_;
- boost::condition_variable val_set_cond_;
+ ConditionVariable val_set_cond_;
bool val_is_set_;
boost::mutex val_lock_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1673e726/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 4b5dbf0..fedba58 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -24,6 +24,7 @@
#include <boost/bind/mem_fn.hpp>
#include "util/aligned-new.h"
+#include "util/condition-variable.h"
#include "util/thread.h"
namespace impala {
@@ -135,7 +136,7 @@ class ThreadPool : public CacheLineAligned {
// If the ThreadPool is not initialized, then the queue must be empty.
DCHECK(initialized_ || work_queue_.Size() == 0);
while (work_queue_.Size() != 0) {
- empty_cv_.wait(l);
+ empty_cv_.Wait(l);
}
}
Shutdown();
@@ -156,7 +157,7 @@ class ThreadPool : public CacheLineAligned {
/// GetSize() and wait()'ing when the condition variable is notified.
/// (It will hang if we notify right before calling wait().)
boost::unique_lock<boost::mutex> l(lock_);
- empty_cv_.notify_all();
+ empty_cv_.NotifyAll();
}
}
}
@@ -200,7 +201,7 @@ class ThreadPool : public CacheLineAligned {
bool shutdown_ = false;
/// Signalled when the queue becomes empty
- boost::condition_variable empty_cv_;
+ ConditionVariable empty_cv_;
};
/// Utility thread-pool that accepts callable work items, and simply invokes them.