You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/16 06:08:35 UTC
[4/4] kudu git commit: Start a background thread to run ResultTracker
GC
Start a background thread to run ResultTracker GC
Change-Id: Ia34ce95e78920596eb8b9db53643845f637c8e6c
Reviewed-on: http://gerrit.cloudera.org:8080/3961
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dccca07c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dccca07c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dccca07c
Branch: refs/heads/master
Commit: dccca07cf14c3d32a80084d0f38d4fe8f0ae1d27
Parents: fc09ddc
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Aug 12 17:07:40 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 16 05:58:58 2016 +0000
----------------------------------------------------------------------
src/kudu/rpc/exactly_once_rpc-test.cc | 32 ++++++++++--------------------
src/kudu/rpc/result_tracker.cc | 25 ++++++++++++++++++++++-
src/kudu/rpc/result_tracker.h | 18 +++++++++++++++--
src/kudu/rpc/service_if.cc | 2 +-
src/kudu/server/server_base.cc | 2 ++
5 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index 3f3f59e..0eacd42 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -21,6 +21,7 @@
DECLARE_int64(remember_clients_ttl_ms);
DECLARE_int64(remember_responses_ttl_ms);
+DECLARE_int64(result_tracker_gc_interval_ms);
using std::atomic_int;
using std::shared_ptr;
@@ -247,16 +248,6 @@ class ExactlyOnceRpcTest : public RpcTestBase {
request_tracker_->RpcCompleted(seq_no);
}
- // Continuously runs GC on the ResultTracker.
- void RunGcThread(MonoDelta run_for) {
- MonoTime run_until = MonoTime::Now();
- run_until.AddDelta(run_for);
- while (MonoTime::Now().ComesBefore(run_until)) {
- result_tracker_->GCResults();
- SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
- }
- }
-
// This continuously issues calls to the server, that often last longer than
// 'remember_responses_ttl_ms', making sure that we don't get errors back.
@@ -546,27 +537,22 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
FLAGS_remember_clients_ttl_ms = 100;
FLAGS_remember_responses_ttl_ms = 10;
+ FLAGS_result_tracker_gc_interval_ms = 10;
StartServer();
- // The write thread runs for the shortest period to make sure client GC has a
+ // The write thread runs for a shorter period to make sure client GC has a
// chance to run.
MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
- // GC runs for the longest period because the stubborn thread may wait beyond its deadline
- // to wait on client GC.
- MonoDelta gc_run_for = MonoDelta::FromSeconds(4);
if (AllowSlowTests()) {
writes_run_for = MonoDelta::FromSeconds(10);
stubborn_run_for = MonoDelta::FromSeconds(11);
- gc_run_for = MonoDelta::FromSeconds(12);
}
- scoped_refptr<kudu::Thread> gc_thread;
scoped_refptr<kudu::Thread> write_thread;
scoped_refptr<kudu::Thread> stubborn_thread;
- CHECK_OK(kudu::Thread::Create(
- "gc", "gc", &ExactlyOnceRpcTest::RunGcThread, this, gc_run_for, &gc_thread));
+ result_tracker_->StartGCThread();
CHECK_OK(kudu::Thread::Create(
"write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
this, writes_run_for, &write_thread));
@@ -574,13 +560,17 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest)
"stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
this, stubborn_run_for, &stubborn_thread));
- gc_thread->Join();
write_thread->Join();
stubborn_thread->Join();
- result_tracker_->GCResults();
- ASSERT_EQ(0, mem_tracker_->consumption());
+ // Within a few seconds, the consumption should be back to zero.
+ // Really, this should be within 100ms, but we'll give it a bit of
+ // time to avoid test flakiness.
+ AssertEventually([&]() {
+ ASSERT_EQ(0, mem_tracker_->consumption());
+ }, MonoDelta::FromSeconds(5));
}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index a75b3e1..259d12a 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -45,6 +45,10 @@ DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */,
"STALE.");
TAG_FLAG(remember_responses_ttl_ms, advanced);
+DEFINE_int64(result_tracker_gc_interval_ms, 1000,
+ "Interval at which the result tracker will look for entries to GC.");
+TAG_FLAG(result_tracker_gc_interval_ms, hidden);
+
namespace kudu {
namespace rpc {
@@ -89,9 +93,15 @@ struct ScopedMemTrackerUpdater {
ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
: mem_tracker_(std::move(mem_tracker)),
clients_(ClientStateMap::key_compare(),
- ClientStateMapAllocator(mem_tracker_)) {}
+ ClientStateMapAllocator(mem_tracker_)),
+ gc_thread_stop_latch_(1) {}
ResultTracker::~ResultTracker() {
+ if (gc_thread_) {
+ gc_thread_stop_latch_.CountDown();
+ gc_thread_->Join();
+ }
+
lock_guard<simple_spinlock> l(lock_);
// Release all the memory for the stuff we'll delete on destruction.
for (auto& client_state : clients_) {
@@ -411,6 +421,19 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
FailAndRespondInternal(request_id, func);
}
+void ResultTracker::StartGCThread() {
+ CHECK(!gc_thread_);
+ CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
+ this, &gc_thread_));
+}
+
+void ResultTracker::RunGCThread() {
+ while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds(
+ FLAGS_result_tracker_gc_interval_ms))) {
+ GCResults();
+ }
+}
+
void ResultTracker::GCResults() {
lock_guard<simple_spinlock> l(lock_);
MonoTime now = MonoTime::Now();
http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index 5c1d518..f629d7a 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -26,10 +26,12 @@
#include "kudu/gutil/stl_util.h"
#include "kudu/rpc/request_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/thread.h"
namespace google {
namespace protobuf {
@@ -141,8 +143,6 @@ class RpcContext;
// }
//
// This class is thread safe.
-//
-// TODO Garbage collection.
class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
public:
typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
@@ -222,6 +222,12 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
int error_ext_id, const std::string& message,
const google::protobuf::Message& app_error_pb);
+ // Start a background thread which periodically runs GCResults().
+ // This thread is automatically stopped in the destructor.
+ //
+ // Must be called at most once.
+ void StartGCThread();
+
// Runs time-based garbage collection on the results this result tracker is caching.
// When garbage collection runs, it goes through all ClientStates and:
// - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
@@ -230,6 +236,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
// through all CompletionRecords and:
// - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
// GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
+ //
+ // Typically this is invoked from an internal thread started by 'StartGCThread()'.
void GCResults();
string ToString();
@@ -361,6 +369,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
std::string ToStringUnlocked() const;
+ void RunGCThread();
+
// The memory tracker that tracks this ResultTracker's memory consumption.
std::shared_ptr<kudu::MemTracker> mem_tracker_;
@@ -378,6 +388,10 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
ClientStateMap clients_;
+ // The thread which runs GC, and a latch to stop it.
+ scoped_refptr<Thread> gc_thread_;
+ CountDownLatch gc_thread_stop_latch_;
+
DISALLOW_COPY_AND_ASSIGN(ResultTracker);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index f3863f2..d64647b 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -29,7 +29,7 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/util/flag_tags.h"
-// TODO remove this once we have ResultTracker GC
+// TODO remove this once we have fully cluster-tested this.
DEFINE_bool(enable_exactly_once, false, "Whether to enable exactly once semantics on the client "
"(experimental).");
TAG_FLAG(enable_exactly_once, experimental);
http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 6010f23..6d84393 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -185,6 +185,8 @@ Status ServerBase::Init() {
RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
+ result_tracker_->StartGCThread();
+
return Status::OK();
}