You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/03 06:51:39 UTC

[kudu] 02/02: util: deadline for BlockingQueue::Blocking{Put,Get}

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e2301c04e40eb55d81435327efa44ccefccc807d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Feb 28 19:11:11 2020 -0800

    util: deadline for BlockingQueue::Blocking{Put,Get}
    
    Drawing inspiration from BlockingDrainTo(), this adds a deadline to
    BlockingPut() and BlockingGet(), with the same error semantics.
    
    This also updates SubprocessServer to use of BlockingPut(), instead of
    manual deadline-checking.
    
    Change-Id: If32bad5667195d72eeb03e4942751824d4dadb7a
    Reviewed-on: http://gerrit.cloudera.org:8080/15324
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log.cc                          |   2 +-
 src/kudu/integration-tests/linked_list-test-util.h |   2 +-
 src/kudu/subprocess/server.cc                      |  33 +------
 src/kudu/subprocess/server.h                       |   4 +-
 src/kudu/subprocess/subprocess_server-test.cc      |   4 +-
 src/kudu/util/blocking_queue-test.cc               | 106 ++++++++++++++++++---
 src/kudu/util/blocking_queue.h                     |  78 +++++++++------
 7 files changed, 152 insertions(+), 77 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index f9a6018..ed6057a 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -856,7 +856,7 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
 
   entry_batch->set_callback(callback);
   TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
-  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()))) {
+  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
     return kLogShutdownStatus;
   }
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 5547f66..1461a2a 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -288,7 +288,7 @@ class ScopedRowUpdater {
     CHECK_OK(session->SetFlushMode(client::KuduSession::AUTO_FLUSH_BACKGROUND));
 
     int64_t next_key;
-    while (to_update_.BlockingGet(&next_key)) {
+    while (to_update_.BlockingGet(&next_key).ok()) {
       std::unique_ptr<client::KuduUpdate> update(table_->NewUpdate());
       CHECK_OK(update->mutable_row()->SetInt64(kKeyColumnName, next_key));
       CHECK_OK(update->mutable_row()->SetBool(kUpdatedColumnName, true));
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 04fd71c..c8dc754 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -33,7 +33,6 @@
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/util/async_util.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
@@ -131,7 +130,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   Synchronizer sync;
   auto cb = sync.AsStdStatusCallback();
   auto call = make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_);
-  RETURN_NOT_OK(QueueCall(call));
+  RETURN_NOT_OK_PREPEND(outbound_call_queue_.BlockingPut(call, call->deadline()),
+                        "couldn't enqueue call");
   return sync.Wait();
 }
 
@@ -186,7 +186,7 @@ void SubprocessServer::ReceiveMessagesThread() {
     // subprocess.
     DCHECK(s.ok());
     WARN_NOT_OK(s, "failed to receive response from the subprocess");
-    if (s.ok() && !inbound_response_queue_.BlockingPut(response)) {
+    if (s.ok() && !inbound_response_queue_.BlockingPut(response).ok()) {
       // The queue has been shut down and we should shut down too.
       DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "failed to put response onto inbound queue";
@@ -290,32 +290,5 @@ void SubprocessServer::SendMessagesThread() {
   LOG(INFO) << "outbound queue shut down: " << s.ToString();
 }
 
-Status SubprocessServer::QueueCall(const shared_ptr<SubprocessCall>& call) {
-  if (MonoTime::Now() > call->deadline()) {
-    return Status::TimedOut("timed out before queueing call");
-  }
-
-  do {
-    QueueStatus queue_status = outbound_call_queue_.Put(call);
-    switch (queue_status) {
-      case QUEUE_SUCCESS:
-        return Status::OK();
-      case QUEUE_SHUTDOWN:
-        return Status::ServiceUnavailable("outbound queue shutting down");
-      case QUEUE_FULL: {
-        // If we still have more time allotted for this call, wait for a bit
-        // and try again; otherwise, time out.
-        if (MonoTime::Now() > call->deadline()) {
-          return Status::TimedOut("timed out trying to queue call");
-        }
-        closing_.WaitFor(
-            MonoDelta::FromMilliseconds(FLAGS_subprocess_queue_full_retry_ms));
-      }
-    }
-  } while (true);
-
-  return Status::OK();
-}
-
 } // namespace subprocess
 } // namespace kudu
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5afc3e9..5e9d625 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -192,8 +192,8 @@ class SubprocessServer {
   // Initialize the server, starting the subprocess and worker threads.
   Status Init() WARN_UNUSED_RESULT;
 
-  // Synchronously send a request to the subprocess and populate 'resp' with
-  // contents returned from the subprocess, or return an error if anything
+  // Synchronously sends a request to the subprocess and populates 'resp' with
+  // contents returned from the subprocess, or returns an error if anything
   // failed or timed out along the way.
   Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT;
 
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index 53d3824..d81f1b4 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -176,7 +176,7 @@ TEST_F(SubprocessServerTest, TestTimeoutBeforeQueueing) {
   SubprocessResponsePB response;
   Status s = server_->Execute(&request, &response);
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "timed out before queueing call");
+  ASSERT_STR_CONTAINS(s.ToString(), "couldn't enqueue call");
 }
 
 // Test when we try sending too many requests at once.
@@ -213,7 +213,7 @@ TEST_F(SubprocessServerTest, TestTimeoutWhileQueueingCalls) {
   bool has_timeout_when_queueing = false;
   for (const auto& s : results) {
     if (s.IsTimedOut() &&
-        s.ToString().find("timed out trying to queue call") != string::npos) {
+        s.ToString().find("couldn't enqueue call") != string::npos) {
       has_timeout_when_queueing = true;
     }
   }
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index a2271ff..bc897e1 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/blocking_queue.h"
+
 #include <cstddef>
 #include <cstdint>
+#include <list>
 #include <map>
 #include <string>
 #include <thread>
@@ -26,9 +29,9 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/countdown_latch.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
@@ -49,11 +52,11 @@ void InsertSomeThings() {
 TEST(BlockingQueueTest, Test1) {
   thread inserter_thread(InsertSomeThings);
   int32_t i;
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(1, i);
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(2, i);
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(3, i);
   inserter_thread.join();
 }
@@ -79,6 +82,83 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
   ASSERT_TRUE(s.IsAborted());
 }
 
+TEST(BlockingQueueTest, TestBlockingPut) {
+  const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+  const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+  BlockingQueue<int32_t> test_queue(2);
+
+  // First, a trivial check that we don't do anything if our deadline has
+  // already passed.
+  Status s = test_queue.BlockingPut(1, MonoTime::Now() - kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // Now put a couple elements onto the queue.
+  ASSERT_OK(test_queue.BlockingPut(1));
+  ASSERT_OK(test_queue.BlockingPut(2));
+
+  // We're at capacity, so further puts should time out...
+  s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // ... until space is freed up before the deadline.
+  thread t([&] {
+    SleepFor(kEvenShorterTimeout);
+    int out;
+    ASSERT_OK(test_queue.BlockingGet(&out));
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+  ASSERT_OK(test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout));
+
+  // If we shut down, we shouldn't be able to put more elements onto the queue.
+  test_queue.Shutdown();
+  s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
+TEST(BlockingQueueTest, TestBlockingGet) {
+  const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+  const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+  BlockingQueue<int32_t> test_queue(2);
+  ASSERT_OK(test_queue.BlockingPut(1));
+  ASSERT_OK(test_queue.BlockingPut(2));
+
+  // Test that if we have stuff in our queue, regardless of deadlines, we'll be
+  // able to get them out.
+  int32_t val = 0;
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout));
+  ASSERT_EQ(1, val);
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(2, val);
+
+  // But without stuff in the queue, we'll time out...
+  Status s = test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // ... until new elements show up.
+  thread t([&] {
+    SleepFor(kEvenShorterTimeout);
+    ASSERT_OK(test_queue.BlockingPut(3));
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(3, val);
+
+  // If we shut down with stuff in our queue, we'll continue to return those
+  // elements. Otherwise, we'll return an error.
+  ASSERT_OK(test_queue.BlockingPut(4));
+  test_queue.Shutdown();
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(4, val);
+  s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
 // Test that, when the queue is shut down with elements still pending,
 // Drain still returns OK until the elements are all gone.
 TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
@@ -91,7 +171,7 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
 
   // Get() should still return an element.
   int i;
-  ASSERT_TRUE(q.BlockingGet(&i));
+  ASSERT_OK(q.BlockingGet(&i));
   ASSERT_EQ(1, i);
 
   // Drain should still return OK, since it yielded elements.
@@ -102,7 +182,8 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
   // Now that it's empty, it should return Aborted.
   Status s = q.BlockingDrainTo(&out);
   ASSERT_TRUE(s.IsAborted()) << s.ToString();
-  ASSERT_FALSE(q.BlockingGet(&i));
+  s = q.BlockingGet(&i);
+  ASSERT_FALSE(s.ok()) << s.ToString();
 }
 
 TEST(BlockingQueueTest, TestTooManyInsertions) {
@@ -154,9 +235,10 @@ TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
   test_queue.Shutdown();
   ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
   int64_t i;
-  ASSERT_TRUE(test_queue.BlockingGet(&i));
+  ASSERT_OK(test_queue.BlockingGet(&i));
   ASSERT_EQ(123, i);
-  ASSERT_FALSE(test_queue.BlockingGet(&i));
+  Status s = test_queue.BlockingGet(&i);
+  ASSERT_FALSE(s.ok()) << s.ToString();
 }
 
 TEST(BlockingQueueTest, TestGscopedPtrMethods) {
@@ -164,7 +246,7 @@ TEST(BlockingQueueTest, TestGscopedPtrMethods) {
   gscoped_ptr<int> input_int(new int(123));
   ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
   gscoped_ptr<int> output_int;
-  ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+  ASSERT_OK(test_queue.BlockingGet(&output_int));
   ASSERT_EQ(123, *output_int.get());
   test_queue.Shutdown();
 }
@@ -187,7 +269,7 @@ class MultiThreadTest {
     sync_latch_.CountDown();
     sync_latch_.Wait();
     for (int i = 0; i < blocking_puts_; i++) {
-      ASSERT_TRUE(queue_.BlockingPut(arg));
+      ASSERT_OK(queue_.BlockingPut(arg));
     }
     MutexLock guard(lock_);
     if (--num_inserters_ == 0) {
@@ -198,8 +280,8 @@ class MultiThreadTest {
   void RemoverThread() {
     for (int i = 0; i < puts_ + blocking_puts_; i++) {
       int32_t arg = 0;
-      bool got = queue_.BlockingGet(&arg);
-      if (!got) {
+      Status s = queue_.BlockingGet(&arg);
+      if (!s.ok()) {
         arg = -1;
       }
       MutexLock guard(lock_);
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 7331c12..a26b3aa 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
-#define KUDU_UTIL_BLOCKING_QUEUE_H
+#pragma once
 
 #include <list>
 #include <string>
@@ -70,9 +69,17 @@ class BlockingQueue {
         << "BlockingQueue holds bare pointers at destruction time";
   }
 
-  // Get an element from the queue.  Returns false if we were shut down prior to
-  // getting the element.
-  bool BlockingGet(T *out) {
+  // Gets an element from the queue; if the queue is empty, blocks until the
+  // queue becomes non-empty, or until the deadline passes.
+  //
+  // If the queue has been shut down but there are still elements in the queue,
+  // it returns those elements as if the queue were not yet shut down.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingGet(T *out, MonoTime deadline = MonoTime()) {
     MutexLock l(lock_);
     while (true) {
       if (!list_.empty()) {
@@ -80,25 +87,26 @@ class BlockingQueue {
         list_.pop_front();
         decrement_size_unlocked(*out);
         not_full_.Signal();
-        return true;
+        return Status::OK();
       }
       if (shutdown_) {
-        return false;
+        return Status::Aborted("");
+      }
+      if (!deadline.Initialized()) {
+        not_empty_.Wait();
+      } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
+        return Status::TimedOut("");
       }
-      not_empty_.Wait();
     }
   }
 
   // Get an element from the queue.  Returns false if the queue is empty and
   // we were shut down prior to getting the element.
-  bool BlockingGet(gscoped_ptr<T_VAL> *out) {
-    T t = NULL;
-    bool got_element = BlockingGet(&t);
-    if (!got_element) {
-      return false;
-    }
+  Status BlockingGet(gscoped_ptr<T_VAL> *out, MonoTime deadline = MonoTime()) {
+    T t = nullptr;
+    RETURN_NOT_OK(BlockingGet(&t, deadline));
     out->reset(t);
-    return true;
+    return Status::OK();
   }
 
   // Get all elements from the queue and append them to a vector.
@@ -168,34 +176,47 @@ class BlockingQueue {
     return s;
   }
 
-  // Gets an element for the queue; if the queue is full, blocks until
-  // space becomes available. Returns false if we were shutdown prior
-  // to enqueueing the element.
-  bool BlockingPut(const T& val) {
+  // Puts an element onto the queue; if the queue is full, blocks until space
+  // becomes available, or until the deadline passes.
+  //
+  // NOTE: unlike BlockingGet() and BlockingDrainTo(), which succeed as long as
+  // there are elements in the queue (regardless of deadline), if the deadline
+  // has passed, an error will be returned even if there is space in the queue.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingPut(const T& val, MonoTime deadline = MonoTime()) {
+    if (deadline.Initialized() && MonoTime::Now() > deadline) {
+      return Status::TimedOut("");
+    }
     MutexLock l(lock_);
     while (true) {
       if (shutdown_) {
-        return false;
+        return Status::Aborted("");
       }
       if (size_ < max_size_) {
         list_.push_back(val);
         increment_size_unlocked(val);
         l.Unlock();
         not_empty_.Signal();
-        return true;
+        return Status::OK();
+      }
+      if (!deadline.Initialized()) {
+        not_full_.Wait();
+      } else if (PREDICT_FALSE(!not_full_.WaitUntil(deadline))) {
+        return Status::TimedOut("");
       }
-      not_full_.Wait();
     }
   }
 
   // Same as other BlockingPut() overload above. If the element was
   // enqueued, gscoped_ptr releases its contents.
-  bool BlockingPut(gscoped_ptr<T_VAL>* val) {
-    bool ret = Put(val->get());
-    if (ret) {
-      ignore_result(val->release());
-    }
-    return ret;
+  Status BlockingPut(gscoped_ptr<T_VAL>* val, MonoTime deadline = MonoTime()) {
+    RETURN_NOT_OK(BlockingPut(val->get(), deadline));
+    ignore_result(val->release());
+    return Status::OK();
   }
 
   // Shut down the queue.
@@ -253,4 +274,3 @@ class BlockingQueue {
 
 } // namespace kudu
 
-#endif