You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/05/11 03:25:48 UTC
[kudu] 03/03: [util] introduce Synchronizer::WaitUntil()
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 762f0fcc30803c20158cc1a75f6eac2fa530a44f
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon May 6 19:10:39 2019 -0700
[util] introduce Synchronizer::WaitUntil()
Added Synchronizer::WaitUntil() method to avoid converting MonoTime
into MonoDelta when the deadline is specified as MonoTime.
Updated corresponding tests for Synchronizer as well.
Change-Id: I00586ac1ba49494ff08abae0d452ab9286a3e56f
Reviewed-on: http://gerrit.cloudera.org:8080/13255
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>
---
src/kudu/master/hms_notification_log_listener.cc | 2 +-
src/kudu/util/async_util-test.cc | 90 +++++++++++++++++-------
src/kudu/util/async_util.h | 9 ++-
3 files changed, 75 insertions(+), 26 deletions(-)
diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index b34d233..2237a4b 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline)
wake_up_cv_.Signal();
}
- RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()),
+ RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline),
"failed to wait for Hive Metastore notification log listener to catch up");
return Status::OK();
}
diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc
index 5cb7a63..91f2baa 100644
--- a/src/kudu/util/async_util-test.cc
+++ b/src/kudu/util/async_util-test.cc
@@ -28,6 +28,7 @@
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/callback.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest {
// Set up an alarm to fail the test in case of deadlock.
alarm(30);
}
- ~AsyncUtilTest() {
+ virtual ~AsyncUtilTest() {
// Disable the alarm on test exit.
alarm(0);
}
@@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) {
}
}
-TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) {
- thread waiter;
- {
- Synchronizer sync;
- auto cb = sync.AsStatusCallback();
- waiter = thread([cb] {
- SleepFor(MonoDelta::FromMilliseconds(5));
- cb.Run(Status::OK());
- });
- ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000)));
- }
- waiter.join();
+// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil().
+enum class TimedWaitFlavor {
+ WaitFor,
+ WaitUntil,
+};
- {
- Synchronizer sync;
- auto cb = sync.AsStatusCallback();
- waiter = thread([cb] {
- SleepFor(MonoDelta::FromMilliseconds(1000));
- cb.Run(Status::OK());
- });
- ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut());
+class AsyncUtilTimedWaitTest:
+ public AsyncUtilTest,
+ public ::testing::WithParamInterface<TimedWaitFlavor> {
+};
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) {
+ const auto kWaitInterval = MonoDelta::FromMilliseconds(1000);
+
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ auto waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ cb.Run(Status::OK());
+ });
+ SCOPED_CLEANUP({
+ waiter.join();
+ });
+ const auto mode = GetParam();
+ switch (mode) {
+ case TimedWaitFlavor::WaitFor:
+ ASSERT_OK(sync.WaitFor(kWaitInterval));
+ break;
+ case TimedWaitFlavor::WaitUntil:
+ ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval));
+ break;
+ default:
+ FAIL() << "unsupported wait mode " << static_cast<int>(mode);
+ break;
}
+}
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) {
+ const auto kWaitInterval = MonoDelta::FromMilliseconds(5);
- // Waiting on the thread gives TSAN to check that no thread safety issues
- // occurred.
- waiter.join();
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ auto waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(1000));
+ cb.Run(Status::OK());
+ });
+ SCOPED_CLEANUP({
+ waiter.join();
+ });
+ const auto mode = GetParam();
+ switch (mode) {
+ case TimedWaitFlavor::WaitFor:
+ ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut());
+ break;
+ case TimedWaitFlavor::WaitUntil:
+ ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut());
+ break;
+ default:
+ FAIL() << "unsupported wait mode " << static_cast<int>(mode);
+ break;
+ }
}
+
+INSTANTIATE_TEST_CASE_P(WaitFlavors,
+ AsyncUtilTimedWaitTest,
+ ::testing::Values(TimedWaitFlavor::WaitFor,
+ TimedWaitFlavor::WaitUntil));
+
} // namespace kudu
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 338c6c2..61621d6 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -44,7 +44,7 @@ namespace kudu {
class Synchronizer {
public:
Synchronizer()
- : data_(std::make_shared<Data>()) {
+ : data_(std::make_shared<Data>()) {
}
void StatusCB(const Status& status) {
@@ -71,6 +71,13 @@ class Synchronizer {
return data_->status;
}
+ Status WaitUntil(const MonoTime& deadline) const {
+ if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) {
+ return Status::TimedOut("timed out while waiting for the callback to be called");
+ }
+ return data_->status;
+ }
+
void Reset() {
data_->latch.Reset(1);
}