You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/09 16:59:32 UTC
[13/21] impala git commit: IMPALA-6449: Use CLOCK_MONOTONIC in
ConditionVariable
IMPALA-6449: Use CLOCK_MONOTONIC in ConditionVariable
ConditionVariable is a thin wrapper around pthread_cond_*.
Currently, pthread_cond_timedwait() uses the default attribute
CLOCK_REALTIME. This is susceptible to adjustment to the system
clock from various sources such as NTP and time may go backward.
This change fixes the problem by switching to using CLOCK_MONOTONIC
so time will be monotonic although the frequency of the clock ticks
may still be adjusted by NTP. Ideally, we should use CLOCK_MONOTONIC_RAW
but it's available only on Linux kernel 2.6.28 or latter. This change
also get rids of some usage of boost::get_system_time() which suffers
from the same problem.
Change-Id: I81611cfd5e7c5347203fe7fa6b0f615602257f87
Reviewed-on: http://gerrit.cloudera.org:8080/9158
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ee74a627
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ee74a627
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ee74a627
Branch: refs/heads/2.x
Commit: ee74a6277dec51fd1cd32acfdfb7821174451c03
Parents: d747670
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Jan 29 18:07:28 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 8 07:01:53 2018 +0000
----------------------------------------------------------------------
be/src/rpc/thrift-server.cc | 4 +--
be/src/runtime/fragment-instance-state.cc | 7 ++---
be/src/service/impala-server.cc | 2 +-
be/src/util/blocking-queue.h | 6 ++--
be/src/util/condition-variable.h | 40 +++++++++++++-------------
be/src/util/promise.h | 8 ++----
be/src/util/time.h | 13 +++++++++
7 files changed, 44 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ded710e..48fb1b9 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -165,8 +165,8 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
&ThriftServer::ThriftServerEventProcessor::Supervise, this,
&thrift_server_->server_thread_));
- system_time deadline = get_system_time() +
- posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
+ timespec deadline;
+ TimeFromNowMillis(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS, &deadline);
// Loop protects against spurious wakeup. Locks provide necessary fences to ensure
// visibility.
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 16b4a7e..ad9e99e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -341,19 +341,16 @@ void FragmentInstanceState::ReportProfileThread() {
// updates at once so its better for contention as well as smoother progress
// reporting.
int report_fragment_offset = rand() % FLAGS_status_report_interval;
- boost::posix_time::seconds wait_duration(report_fragment_offset);
// We don't want to wait longer than it takes to run the entire fragment.
- stop_report_thread_cv_.WaitFor(l, wait_duration);
+ stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_SEC);
while (report_thread_active_) {
- boost::posix_time::seconds loop_wait_duration(FLAGS_status_report_interval);
-
// timed_wait can return because the timeout occurred or the condition variable
// was signaled. We can't rely on its return value to distinguish between the
// two cases (e.g. there is a race here where the wait timed out but before grabbing
// the lock, the condition variable was signaled). Instead, we will use an external
// flag, report_thread_active_, to coordinate this.
- stop_report_thread_cv_.WaitFor(l, loop_wait_duration);
+ stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval * MICROS_PER_SEC);
if (!report_thread_active_) break;
SendReport(false, Status::OK());
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 0c5f75b..cf5f5fb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1772,7 +1772,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
session_timeout_cv_.Wait(timeout_lock);
} else {
// Sleep for a second before checking whether an active session can be expired.
- session_timeout_cv_.WaitFor(timeout_lock, seconds(1));
+ session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a4b1b8f..1dd90d5 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -138,13 +138,13 @@ class BlockingQueue : public CacheLineAligned {
bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
MonotonicStopWatch timer;
boost::unique_lock<boost::mutex> write_lock(put_lock_);
- boost::system_time wtime = boost::get_system_time() +
- boost::posix_time::microseconds(timeout_micros);
+ timespec abs_time;
+ TimeFromNowMicros(timeout_micros, &abs_time);
bool notified = true;
while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
timer.Start();
// Wait until we're notified or until the timeout expires.
- notified = put_cv_.WaitUntil(write_lock, wtime);
+ notified = put_cv_.WaitUntil(write_lock, abs_time);
timer.Stop();
}
total_put_wait_time_ += timer.ElapsedTime();
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
index c1a1e56..e463790 100644
--- a/be/src/util/condition-variable.h
+++ b/be/src/util/condition-variable.h
@@ -24,13 +24,23 @@
#include <pthread.h>
#include <unistd.h>
+#include "util/time.h"
+
namespace impala {
/// Simple wrapper around POSIX pthread condition variable. This has lower overhead than
/// boost's implementation as it doesn't implement boost thread interruption.
class ConditionVariable {
public:
- ConditionVariable() { pthread_cond_init(&cv_, NULL); }
+ ConditionVariable() {
+ pthread_condattr_t attrs;
+ int retval = pthread_condattr_init(&attrs);
+ DCHECK_EQ(0, retval);
+ pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
+ retval = pthread_cond_init(&cv_, &attrs);
+ DCHECK_EQ(0, retval);
+ pthread_condattr_destroy(&attrs);
+ }
~ConditionVariable() { pthread_cond_destroy(&cv_); }
@@ -41,32 +51,22 @@ class ConditionVariable {
pthread_cond_wait(&cv_, mutex);
}
- /// Wait until the condition variable is notified or 'timeout' has passed.
+ /// Wait until the condition variable is notified or 'abs_time' has passed.
/// Returns true if the condition variable is notified before the absolute timeout
- /// specified in 'timeout' has passed. Returns false otherwise.
- bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
- const timespec& abs_time) {
+ /// specified in 'abs_time' has passed. Returns false otherwise.
+ bool WaitUntil(boost::unique_lock<boost::mutex>& lock, const timespec& abs_time) {
DCHECK(lock.owns_lock());
pthread_mutex_t* mutex = lock.mutex()->native_handle();
return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0;
}
- /// Wait until the condition variable is notified or 'abs_time' has passed.
- /// Returns true if the condition variable is notified before the absolute timeout
- /// specified in 'abs_time' has passed. Returns false otherwise.
- bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
- const boost::system_time& abs_time) {
- return WaitUntil(lock, to_timespec(abs_time));
- }
-
- /// Wait until the condition variable is notified or have waited for the time
- /// specified in 'wait_duration'.
- /// Returns true if the condition variable is notified in time.
+ /// Wait until the condition variable is notified or 'duration_us' microseconds
+ /// have passed. Returns true if the condition variable is notified in time.
/// Returns false otherwise.
- template <typename duration_type>
- bool WaitFor(boost::unique_lock<boost::mutex>& lock,
- const duration_type& wait_duration) {
- return WaitUntil(lock, to_timespec(boost::get_system_time() + wait_duration));
+ bool WaitFor(boost::unique_lock<boost::mutex>& lock, int64_t duration_us) {
+ timespec deadline;
+ TimeFromNowMicros(duration_us, &deadline);
+ return WaitUntil(lock, deadline);
}
/// Notify a single waiter on this condition variable.
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/util/promise.h b/be/src/util/promise.h
index 5de2d13..c93d9f2 100644
--- a/be/src/util/promise.h
+++ b/be/src/util/promise.h
@@ -77,17 +77,15 @@ class Promise {
/// timed_out: Indicates whether Get() returned due to timeout. Must be non-NULL.
const T& Get(int64_t timeout_millis, bool* timed_out) {
DCHECK_GT(timeout_millis, 0);
- int64_t timeout_micros = timeout_millis * 1000;
+ int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
DCHECK(timed_out != NULL);
boost::unique_lock<boost::mutex> l(val_lock_);
int64_t start;
int64_t now;
now = start = MonotonicMicros();
while (!val_is_set_ && (now - start) < timeout_micros) {
- boost::posix_time::microseconds wait_time =
- boost::posix_time::microseconds(std::max<int64_t>(
- 1, timeout_micros - (now - start)));
- val_set_cond_.WaitFor(l, wait_time);
+ int64_t wait_time_micros = std::max<int64_t>(1, timeout_micros - (now - start));
+ val_set_cond_.WaitFor(l, wait_time_micros);
now = MonotonicMicros();
}
*timed_out = !val_is_set_;
http://git-wip-us.apache.org/repos/asf/impala/blob/ee74a627/be/src/util/time.h
----------------------------------------------------------------------
diff --git a/be/src/util/time.h b/be/src/util/time.h
index cef14c8..64dbf9c 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -57,6 +57,19 @@ inline int64_t UnixMillis() {
return GetCurrentTimeMicros() / MICROS_PER_MILLI;
}
+/// Return the time 'time_us' microseconds away from now in 'abs_time'.
+inline void TimeFromNowMicros(int64_t time_us, timespec* abs_time) {
+ clock_gettime(CLOCK_MONOTONIC, abs_time);
+ abs_time->tv_nsec += (time_us % MICROS_PER_SEC) * NANOS_PER_MICRO;
+ abs_time->tv_sec += time_us / MICROS_PER_SEC + abs_time->tv_nsec / NANOS_PER_SEC;
+ abs_time->tv_nsec %= NANOS_PER_SEC;
+}
+
+/// Return the time 'time_ms' milliseconds away from now in 'abs_time'.
+inline void TimeFromNowMillis(int64_t time_ms, timespec* abs_time) {
+ TimeFromNowMicros(time_ms * MICROS_PER_MILLI, abs_time);
+}
+
/// Returns the number of microseconds that have passed since the Unix epoch. This is
/// affected by manual changes to the system clock but is more suitable for use across
/// a cluster. For more accurate timings on the local host use the monotonic functions