You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/03 06:51:39 UTC
[kudu] 02/02: util: deadline for BlockingQueue::Blocking{Put,Get}
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e2301c04e40eb55d81435327efa44ccefccc807d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Feb 28 19:11:11 2020 -0800
util: deadline for BlockingQueue::Blocking{Put,Get}
Drawing inspiration from BlockingDrainTo(), this adds a deadline to
BlockingPut() and BlockingGet(), with the same error semantics.
This also updates SubprocessServer to use of BlockingPut(), instead of
manual deadline-checking.
Change-Id: If32bad5667195d72eeb03e4942751824d4dadb7a
Reviewed-on: http://gerrit.cloudera.org:8080/15324
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
---
src/kudu/consensus/log.cc | 2 +-
src/kudu/integration-tests/linked_list-test-util.h | 2 +-
src/kudu/subprocess/server.cc | 33 +------
src/kudu/subprocess/server.h | 4 +-
src/kudu/subprocess/subprocess_server-test.cc | 4 +-
src/kudu/util/blocking_queue-test.cc | 106 ++++++++++++++++++---
src/kudu/util/blocking_queue.h | 78 +++++++++------
7 files changed, 152 insertions(+), 77 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index f9a6018..ed6057a 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -856,7 +856,7 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
entry_batch->set_callback(callback);
TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
- if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()))) {
+ if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
return kLogShutdownStatus;
}
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 5547f66..1461a2a 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -288,7 +288,7 @@ class ScopedRowUpdater {
CHECK_OK(session->SetFlushMode(client::KuduSession::AUTO_FLUSH_BACKGROUND));
int64_t next_key;
- while (to_update_.BlockingGet(&next_key)) {
+ while (to_update_.BlockingGet(&next_key).ok()) {
std::unique_ptr<client::KuduUpdate> update(table_->NewUpdate());
CHECK_OK(update->mutable_row()->SetInt64(kKeyColumnName, next_key));
CHECK_OK(update->mutable_row()->SetBool(kUpdatedColumnName, true));
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 04fd71c..c8dc754 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -33,7 +33,6 @@
#include "kudu/subprocess/subprocess.pb.h"
#include "kudu/subprocess/subprocess_protocol.h"
#include "kudu/util/async_util.h"
-#include "kudu/util/blocking_queue.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
@@ -131,7 +130,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
Synchronizer sync;
auto cb = sync.AsStdStatusCallback();
auto call = make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_);
- RETURN_NOT_OK(QueueCall(call));
+ RETURN_NOT_OK_PREPEND(outbound_call_queue_.BlockingPut(call, call->deadline()),
+ "couldn't enqueue call");
return sync.Wait();
}
@@ -186,7 +186,7 @@ void SubprocessServer::ReceiveMessagesThread() {
// subprocess.
DCHECK(s.ok());
WARN_NOT_OK(s, "failed to receive response from the subprocess");
- if (s.ok() && !inbound_response_queue_.BlockingPut(response)) {
+ if (s.ok() && !inbound_response_queue_.BlockingPut(response).ok()) {
// The queue has been shut down and we should shut down too.
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "failed to put response onto inbound queue";
@@ -290,32 +290,5 @@ void SubprocessServer::SendMessagesThread() {
LOG(INFO) << "outbound queue shut down: " << s.ToString();
}
-Status SubprocessServer::QueueCall(const shared_ptr<SubprocessCall>& call) {
- if (MonoTime::Now() > call->deadline()) {
- return Status::TimedOut("timed out before queueing call");
- }
-
- do {
- QueueStatus queue_status = outbound_call_queue_.Put(call);
- switch (queue_status) {
- case QUEUE_SUCCESS:
- return Status::OK();
- case QUEUE_SHUTDOWN:
- return Status::ServiceUnavailable("outbound queue shutting down");
- case QUEUE_FULL: {
- // If we still have more time allotted for this call, wait for a bit
- // and try again; otherwise, time out.
- if (MonoTime::Now() > call->deadline()) {
- return Status::TimedOut("timed out trying to queue call");
- }
- closing_.WaitFor(
- MonoDelta::FromMilliseconds(FLAGS_subprocess_queue_full_retry_ms));
- }
- }
- } while (true);
-
- return Status::OK();
-}
-
} // namespace subprocess
} // namespace kudu
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5afc3e9..5e9d625 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -192,8 +192,8 @@ class SubprocessServer {
// Initialize the server, starting the subprocess and worker threads.
Status Init() WARN_UNUSED_RESULT;
- // Synchronously send a request to the subprocess and populate 'resp' with
- // contents returned from the subprocess, or return an error if anything
+ // Synchronously sends a request to the subprocess and populates 'resp' with
+ // contents returned from the subprocess, or returns an error if anything
// failed or timed out along the way.
Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT;
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index 53d3824..d81f1b4 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -176,7 +176,7 @@ TEST_F(SubprocessServerTest, TestTimeoutBeforeQueueing) {
SubprocessResponsePB response;
Status s = server_->Execute(&request, &response);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "timed out before queueing call");
+ ASSERT_STR_CONTAINS(s.ToString(), "couldn't enqueue call");
}
// Test when we try sending too many requests at once.
@@ -213,7 +213,7 @@ TEST_F(SubprocessServerTest, TestTimeoutWhileQueueingCalls) {
bool has_timeout_when_queueing = false;
for (const auto& s : results) {
if (s.IsTimedOut() &&
- s.ToString().find("timed out trying to queue call") != string::npos) {
+ s.ToString().find("couldn't enqueue call") != string::npos) {
has_timeout_when_queueing = true;
}
}
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index a2271ff..bc897e1 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/util/blocking_queue.h"
+
#include <cstddef>
#include <cstdint>
+#include <list>
#include <map>
#include <string>
#include <thread>
@@ -26,9 +29,9 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/util/countdown_latch.h"
-#include "kudu/util/blocking_queue.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -49,11 +52,11 @@ void InsertSomeThings() {
TEST(BlockingQueueTest, Test1) {
thread inserter_thread(InsertSomeThings);
int32_t i;
- ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_OK(test1_queue.BlockingGet(&i));
ASSERT_EQ(1, i);
- ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_OK(test1_queue.BlockingGet(&i));
ASSERT_EQ(2, i);
- ASSERT_TRUE(test1_queue.BlockingGet(&i));
+ ASSERT_OK(test1_queue.BlockingGet(&i));
ASSERT_EQ(3, i);
inserter_thread.join();
}
@@ -79,6 +82,83 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
ASSERT_TRUE(s.IsAborted());
}
+TEST(BlockingQueueTest, TestBlockingPut) {
+ const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+ const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+ BlockingQueue<int32_t> test_queue(2);
+
+ // First, a trivial check that we don't do anything if our deadline has
+ // already passed.
+ Status s = test_queue.BlockingPut(1, MonoTime::Now() - kShortTimeout);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+ // Now put a couple elements onto the queue.
+ ASSERT_OK(test_queue.BlockingPut(1));
+ ASSERT_OK(test_queue.BlockingPut(2));
+
+ // We're at capacity, so further puts should time out...
+ s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+ // ... until space is freed up before the deadline.
+ thread t([&] {
+ SleepFor(kEvenShorterTimeout);
+ int out;
+ ASSERT_OK(test_queue.BlockingGet(&out));
+ });
+ SCOPED_CLEANUP({
+ t.join();
+ });
+ ASSERT_OK(test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout));
+
+ // If we shut down, we shouldn't be able to put more elements onto the queue.
+ test_queue.Shutdown();
+ s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+ ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
+TEST(BlockingQueueTest, TestBlockingGet) {
+ const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+ const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+ BlockingQueue<int32_t> test_queue(2);
+ ASSERT_OK(test_queue.BlockingPut(1));
+ ASSERT_OK(test_queue.BlockingPut(2));
+
+ // Test that if we have stuff in our queue, regardless of deadlines, we'll be
+ // able to get them out.
+ int32_t val = 0;
+ ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout));
+ ASSERT_EQ(1, val);
+ ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+ ASSERT_EQ(2, val);
+
+ // But without stuff in the queue, we'll time out...
+ Status s = test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+ // ... until new elements show up.
+ thread t([&] {
+ SleepFor(kEvenShorterTimeout);
+ ASSERT_OK(test_queue.BlockingPut(3));
+ });
+ SCOPED_CLEANUP({
+ t.join();
+ });
+ ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+ ASSERT_EQ(3, val);
+
+ // If we shut down with stuff in our queue, we'll continue to return those
+ // elements. Otherwise, we'll return an error.
+ ASSERT_OK(test_queue.BlockingPut(4));
+ test_queue.Shutdown();
+ ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+ ASSERT_EQ(4, val);
+ s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+ ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
// Test that, when the queue is shut down with elements still pending,
// Drain still returns OK until the elements are all gone.
TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
@@ -91,7 +171,7 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
// Get() should still return an element.
int i;
- ASSERT_TRUE(q.BlockingGet(&i));
+ ASSERT_OK(q.BlockingGet(&i));
ASSERT_EQ(1, i);
// Drain should still return OK, since it yielded elements.
@@ -102,7 +182,8 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
// Now that it's empty, it should return Aborted.
Status s = q.BlockingDrainTo(&out);
ASSERT_TRUE(s.IsAborted()) << s.ToString();
- ASSERT_FALSE(q.BlockingGet(&i));
+ s = q.BlockingGet(&i);
+ ASSERT_FALSE(s.ok()) << s.ToString();
}
TEST(BlockingQueueTest, TestTooManyInsertions) {
@@ -154,9 +235,10 @@ TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
test_queue.Shutdown();
ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
int64_t i;
- ASSERT_TRUE(test_queue.BlockingGet(&i));
+ ASSERT_OK(test_queue.BlockingGet(&i));
ASSERT_EQ(123, i);
- ASSERT_FALSE(test_queue.BlockingGet(&i));
+ Status s = test_queue.BlockingGet(&i);
+ ASSERT_FALSE(s.ok()) << s.ToString();
}
TEST(BlockingQueueTest, TestGscopedPtrMethods) {
@@ -164,7 +246,7 @@ TEST(BlockingQueueTest, TestGscopedPtrMethods) {
gscoped_ptr<int> input_int(new int(123));
ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
gscoped_ptr<int> output_int;
- ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+ ASSERT_OK(test_queue.BlockingGet(&output_int));
ASSERT_EQ(123, *output_int.get());
test_queue.Shutdown();
}
@@ -187,7 +269,7 @@ class MultiThreadTest {
sync_latch_.CountDown();
sync_latch_.Wait();
for (int i = 0; i < blocking_puts_; i++) {
- ASSERT_TRUE(queue_.BlockingPut(arg));
+ ASSERT_OK(queue_.BlockingPut(arg));
}
MutexLock guard(lock_);
if (--num_inserters_ == 0) {
@@ -198,8 +280,8 @@ class MultiThreadTest {
void RemoverThread() {
for (int i = 0; i < puts_ + blocking_puts_; i++) {
int32_t arg = 0;
- bool got = queue_.BlockingGet(&arg);
- if (!got) {
+ Status s = queue_.BlockingGet(&arg);
+ if (!s.ok()) {
arg = -1;
}
MutexLock guard(lock_);
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 7331c12..a26b3aa 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -14,8 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
-#define KUDU_UTIL_BLOCKING_QUEUE_H
+#pragma once
#include <list>
#include <string>
@@ -70,9 +69,17 @@ class BlockingQueue {
<< "BlockingQueue holds bare pointers at destruction time";
}
- // Get an element from the queue. Returns false if we were shut down prior to
- // getting the element.
- bool BlockingGet(T *out) {
+ // Gets an element from the queue; if the queue is empty, blocks until the
+ // queue becomes non-empty, or until the deadline passes.
+ //
+ // If the queue has been shut down but there are still elements in the queue,
+ // it returns those elements as if the queue were not yet shut down.
+ //
+ // Returns:
+ // - OK if successful
+ // - TimedOut if the deadline passed
+ // - Aborted if the queue shut down
+ Status BlockingGet(T *out, MonoTime deadline = MonoTime()) {
MutexLock l(lock_);
while (true) {
if (!list_.empty()) {
@@ -80,25 +87,26 @@ class BlockingQueue {
list_.pop_front();
decrement_size_unlocked(*out);
not_full_.Signal();
- return true;
+ return Status::OK();
}
if (shutdown_) {
- return false;
+ return Status::Aborted("");
+ }
+ if (!deadline.Initialized()) {
+ not_empty_.Wait();
+ } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
+ return Status::TimedOut("");
}
- not_empty_.Wait();
}
}
// Get an element from the queue. Returns false if the queue is empty and
// we were shut down prior to getting the element.
- bool BlockingGet(gscoped_ptr<T_VAL> *out) {
- T t = NULL;
- bool got_element = BlockingGet(&t);
- if (!got_element) {
- return false;
- }
+ Status BlockingGet(gscoped_ptr<T_VAL> *out, MonoTime deadline = MonoTime()) {
+ T t = nullptr;
+ RETURN_NOT_OK(BlockingGet(&t, deadline));
out->reset(t);
- return true;
+ return Status::OK();
}
// Get all elements from the queue and append them to a vector.
@@ -168,34 +176,47 @@ class BlockingQueue {
return s;
}
- // Gets an element for the queue; if the queue is full, blocks until
- // space becomes available. Returns false if we were shutdown prior
- // to enqueueing the element.
- bool BlockingPut(const T& val) {
+ // Puts an element onto the queue; if the queue is full, blocks until space
+ // becomes available, or until the deadline passes.
+ //
+ // NOTE: unlike BlockingGet() and BlockingDrainTo(), which succeed as long as
+ // there are elements in the queue (regardless of deadline), if the deadline
+ // has passed, an error will be returned even if there is space in the queue.
+ //
+ // Returns:
+ // - OK if successful
+ // - TimedOut if the deadline passed
+ // - Aborted if the queue shut down
+ Status BlockingPut(const T& val, MonoTime deadline = MonoTime()) {
+ if (deadline.Initialized() && MonoTime::Now() > deadline) {
+ return Status::TimedOut("");
+ }
MutexLock l(lock_);
while (true) {
if (shutdown_) {
- return false;
+ return Status::Aborted("");
}
if (size_ < max_size_) {
list_.push_back(val);
increment_size_unlocked(val);
l.Unlock();
not_empty_.Signal();
- return true;
+ return Status::OK();
+ }
+ if (!deadline.Initialized()) {
+ not_full_.Wait();
+ } else if (PREDICT_FALSE(!not_full_.WaitUntil(deadline))) {
+ return Status::TimedOut("");
}
- not_full_.Wait();
}
}
// Same as other BlockingPut() overload above. If the element was
// enqueued, gscoped_ptr releases its contents.
- bool BlockingPut(gscoped_ptr<T_VAL>* val) {
- bool ret = Put(val->get());
- if (ret) {
- ignore_result(val->release());
- }
- return ret;
+ Status BlockingPut(gscoped_ptr<T_VAL>* val, MonoTime deadline = MonoTime()) {
+ RETURN_NOT_OK(BlockingPut(val->get(), deadline));
+ ignore_result(val->release());
+ return Status::OK();
}
// Shut down the queue.
@@ -253,4 +274,3 @@ class BlockingQueue {
} // namespace kudu
-#endif