You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/05/11 03:25:48 UTC

[kudu] 03/03: [util] introduce Synchronizer::WaitUntil()

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 762f0fcc30803c20158cc1a75f6eac2fa530a44f
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon May 6 19:10:39 2019 -0700

    [util] introduce Synchronizer::WaitUntil()
    
    Added Synchronizer::WaitUntil() method to avoid converting MonoTime
    into MonoDelta when the deadline is specified as MonoTime.
    
    Updated corresponding tests for Synchronizer as well.
    
    Change-Id: I00586ac1ba49494ff08abae0d452ab9286a3e56f
    Reviewed-on: http://gerrit.cloudera.org:8080/13255
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/master/hms_notification_log_listener.cc |  2 +-
 src/kudu/util/async_util-test.cc                 | 90 +++++++++++++++++-------
 src/kudu/util/async_util.h                       |  9 ++-
 3 files changed, 75 insertions(+), 26 deletions(-)

diff --git a/src/kudu/master/hms_notification_log_listener.cc b/src/kudu/master/hms_notification_log_listener.cc
index b34d233..2237a4b 100644
--- a/src/kudu/master/hms_notification_log_listener.cc
+++ b/src/kudu/master/hms_notification_log_listener.cc
@@ -118,7 +118,7 @@ Status HmsNotificationLogListenerTask::WaitForCatchUp(const MonoTime& deadline)
     wake_up_cv_.Signal();
   }
 
-  RETURN_NOT_OK_PREPEND(synchronizer.WaitFor(deadline - MonoTime::Now()),
+  RETURN_NOT_OK_PREPEND(synchronizer.WaitUntil(deadline),
                         "failed to wait for Hive Metastore notification log listener to catch up");
   return Status::OK();
 }
diff --git a/src/kudu/util/async_util-test.cc b/src/kudu/util/async_util-test.cc
index 5cb7a63..91f2baa 100644
--- a/src/kudu/util/async_util-test.cc
+++ b/src/kudu/util/async_util-test.cc
@@ -28,6 +28,7 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -43,7 +44,7 @@ class AsyncUtilTest : public KuduTest {
     // Set up an alarm to fail the test in case of deadlock.
     alarm(30);
   }
-  ~AsyncUtilTest() {
+  virtual ~AsyncUtilTest() {
     // Disable the alarm on test exit.
     alarm(0);
   }
@@ -99,31 +100,72 @@ TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) {
   }
 }
 
-TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) {
-  thread waiter;
-  {
-    Synchronizer sync;
-    auto cb = sync.AsStatusCallback();
-    waiter = thread([cb] {
-        SleepFor(MonoDelta::FromMilliseconds(5));
-        cb.Run(Status::OK());
-    });
-    ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000)));
-  }
-  waiter.join();
+// Flavors of wait that Synchronizer is capable of: WaitFor() or WaitUntil().
+enum class TimedWaitFlavor {
+  WaitFor,
+  WaitUntil,
+};
 
-  {
-    Synchronizer sync;
-    auto cb = sync.AsStatusCallback();
-    waiter = thread([cb] {
-        SleepFor(MonoDelta::FromMilliseconds(1000));
-        cb.Run(Status::OK());
-    });
-    ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut());
+class AsyncUtilTimedWaitTest:
+    public AsyncUtilTest,
+    public ::testing::WithParamInterface<TimedWaitFlavor> {
+};
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitSuccess) {
+  const auto kWaitInterval = MonoDelta::FromMilliseconds(1000);
+
+  Synchronizer sync;
+  auto cb = sync.AsStatusCallback();
+  auto waiter = thread([cb] {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    cb.Run(Status::OK());
+  });
+  SCOPED_CLEANUP({
+    waiter.join();
+  });
+  const auto mode = GetParam();
+  switch (mode) {
+    case TimedWaitFlavor::WaitFor:
+      ASSERT_OK(sync.WaitFor(kWaitInterval));
+      break;
+    case TimedWaitFlavor::WaitUntil:
+      ASSERT_OK(sync.WaitUntil(MonoTime::Now() + kWaitInterval));
+      break;
+    default:
+      FAIL() << "unsupported wait mode " << static_cast<int>(mode);
+      break;
   }
+}
+
+TEST_P(AsyncUtilTimedWaitTest, SynchronizerTimedWaitTimeout) {
+  const auto kWaitInterval = MonoDelta::FromMilliseconds(5);
 
-  // Waiting on the thread gives TSAN to check that no thread safety issues
-  // occurred.
-  waiter.join();
+  Synchronizer sync;
+  auto cb = sync.AsStatusCallback();
+  auto waiter = thread([cb] {
+    SleepFor(MonoDelta::FromMilliseconds(1000));
+    cb.Run(Status::OK());
+  });
+  SCOPED_CLEANUP({
+    waiter.join();
+  });
+  const auto mode = GetParam();
+  switch (mode) {
+    case TimedWaitFlavor::WaitFor:
+      ASSERT_TRUE(sync.WaitFor(kWaitInterval).IsTimedOut());
+      break;
+    case TimedWaitFlavor::WaitUntil:
+      ASSERT_TRUE(sync.WaitUntil(MonoTime::Now() + kWaitInterval).IsTimedOut());
+      break;
+    default:
+      FAIL() << "unsupported wait mode " << static_cast<int>(mode);
+      break;
+  }
 }
+
+INSTANTIATE_TEST_CASE_P(WaitFlavors,
+                        AsyncUtilTimedWaitTest,
+                        ::testing::Values(TimedWaitFlavor::WaitFor,
+                                          TimedWaitFlavor::WaitUntil));
+
 } // namespace kudu
diff --git a/src/kudu/util/async_util.h b/src/kudu/util/async_util.h
index 338c6c2..61621d6 100644
--- a/src/kudu/util/async_util.h
+++ b/src/kudu/util/async_util.h
@@ -44,7 +44,7 @@ namespace kudu {
 class Synchronizer {
  public:
   Synchronizer()
-    : data_(std::make_shared<Data>()) {
+      : data_(std::make_shared<Data>()) {
   }
 
   void StatusCB(const Status& status) {
@@ -71,6 +71,13 @@ class Synchronizer {
     return data_->status;
   }
 
+  Status WaitUntil(const MonoTime& deadline) const {
+    if (PREDICT_FALSE(!data_->latch.WaitUntil(deadline))) {
+      return Status::TimedOut("timed out while waiting for the callback to be called");
+    }
+    return data_->status;
+  }
+
   void Reset() {
     data_->latch.Reset(1);
   }