You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/08/16 06:08:35 UTC

[4/4] kudu git commit: Start a background thread to run ResultTracker GC

Start a background thread to run ResultTracker GC

Change-Id: Ia34ce95e78920596eb8b9db53643845f637c8e6c
Reviewed-on: http://gerrit.cloudera.org:8080/3961
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dccca07c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dccca07c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dccca07c

Branch: refs/heads/master
Commit: dccca07cf14c3d32a80084d0f38d4fe8f0ae1d27
Parents: fc09ddc
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Aug 12 17:07:40 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 16 05:58:58 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/exactly_once_rpc-test.cc | 32 ++++++++++--------------------
 src/kudu/rpc/result_tracker.cc        | 25 ++++++++++++++++++++++-
 src/kudu/rpc/result_tracker.h         | 18 +++++++++++++++--
 src/kudu/rpc/service_if.cc            |  2 +-
 src/kudu/server/server_base.cc        |  2 ++
 5 files changed, 54 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index 3f3f59e..0eacd42 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -21,6 +21,7 @@
 
 DECLARE_int64(remember_clients_ttl_ms);
 DECLARE_int64(remember_responses_ttl_ms);
+DECLARE_int64(result_tracker_gc_interval_ms);
 
 using std::atomic_int;
 using std::shared_ptr;
@@ -247,16 +248,6 @@ class ExactlyOnceRpcTest : public RpcTestBase {
     request_tracker_->RpcCompleted(seq_no);
   }
 
-  // Continuously runs GC on the ResultTracker.
-  void RunGcThread(MonoDelta run_for) {
-    MonoTime run_until = MonoTime::Now();
-    run_until.AddDelta(run_for);
-    while (MonoTime::Now().ComesBefore(run_until)) {
-      result_tracker_->GCResults();
-      SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
-    }
-  }
-
 
   // This continuously issues calls to the server, that often last longer than
   // 'remember_responses_ttl_ms', making sure that we don't get errors back.
@@ -546,27 +537,22 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
 TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
   FLAGS_remember_clients_ttl_ms = 100;
   FLAGS_remember_responses_ttl_ms = 10;
+  FLAGS_result_tracker_gc_interval_ms = 10;
 
   StartServer();
 
-  // The write thread runs for the shortest period to make sure client GC has a
+  // The write thread runs for a shorter period to make sure client GC has a
   // chance to run.
   MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
   MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
-  // GC runs for the longest period because the stubborn thread may wait beyond its deadline
-  // to wait on client GC.
-  MonoDelta gc_run_for = MonoDelta::FromSeconds(4);
   if (AllowSlowTests()) {
     writes_run_for = MonoDelta::FromSeconds(10);
     stubborn_run_for = MonoDelta::FromSeconds(11);
-    gc_run_for = MonoDelta::FromSeconds(12);
   }
 
-  scoped_refptr<kudu::Thread> gc_thread;
   scoped_refptr<kudu::Thread> write_thread;
   scoped_refptr<kudu::Thread> stubborn_thread;
-  CHECK_OK(kudu::Thread::Create(
-      "gc", "gc", &ExactlyOnceRpcTest::RunGcThread, this, gc_run_for, &gc_thread));
+  result_tracker_->StartGCThread();
   CHECK_OK(kudu::Thread::Create(
       "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
       this, writes_run_for, &write_thread));
@@ -574,13 +560,17 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest)
       "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
       this, stubborn_run_for, &stubborn_thread));
 
-  gc_thread->Join();
   write_thread->Join();
   stubborn_thread->Join();
 
-  result_tracker_->GCResults();
-  ASSERT_EQ(0, mem_tracker_->consumption());
+  // Within a few seconds, the consumption should be back to zero.
+  // Really, this should be within 100ms, but we'll give it a bit of
+  // time to avoid test flakiness.
+  AssertEventually([&]() {
+      ASSERT_EQ(0, mem_tracker_->consumption());
+    }, MonoDelta::FromSeconds(5));
 }
 
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index a75b3e1..259d12a 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -45,6 +45,10 @@ DEFINE_int64(remember_responses_ttl_ms, 600 * 1000 /* 10 mins */,
     "STALE.");
 TAG_FLAG(remember_responses_ttl_ms, advanced);
 
+DEFINE_int64(result_tracker_gc_interval_ms, 1000,
+    "Interval at which the result tracker will look for entries to GC.");
+TAG_FLAG(result_tracker_gc_interval_ms, hidden);
+
 namespace kudu {
 namespace rpc {
 
@@ -89,9 +93,15 @@ struct ScopedMemTrackerUpdater {
 ResultTracker::ResultTracker(shared_ptr<MemTracker> mem_tracker)
     : mem_tracker_(std::move(mem_tracker)),
       clients_(ClientStateMap::key_compare(),
-               ClientStateMapAllocator(mem_tracker_)) {}
+               ClientStateMapAllocator(mem_tracker_)),
+      gc_thread_stop_latch_(1) {}
 
 ResultTracker::~ResultTracker() {
+  if (gc_thread_) {
+    gc_thread_stop_latch_.CountDown();
+    gc_thread_->Join();
+  }
+
   lock_guard<simple_spinlock> l(lock_);
   // Release all the memory for the stuff we'll delete on destruction.
   for (auto& client_state : clients_) {
@@ -411,6 +421,19 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
   FailAndRespondInternal(request_id, func);
 }
 
+void ResultTracker::StartGCThread() {
+  CHECK(!gc_thread_);
+  CHECK_OK(Thread::Create("server", "result-tracker", &ResultTracker::RunGCThread,
+                          this, &gc_thread_));
+}
+
+void ResultTracker::RunGCThread() {
+  while (!gc_thread_stop_latch_.WaitFor(MonoDelta::FromMilliseconds(
+             FLAGS_result_tracker_gc_interval_ms))) {
+    GCResults();
+  }
+}
+
 void ResultTracker::GCResults() {
   lock_guard<simple_spinlock> l(lock_);
   MonoTime now = MonoTime::Now();

http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index 5c1d518..f629d7a 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -26,10 +26,12 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/rpc/request_tracker.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/thread.h"
 
 namespace google {
 namespace protobuf {
@@ -141,8 +143,6 @@ class RpcContext;
 // }
 //
 // This class is thread safe.
-//
-// TODO Garbage collection.
 class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
  public:
   typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
@@ -222,6 +222,12 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
                       int error_ext_id, const std::string& message,
                       const google::protobuf::Message& app_error_pb);
 
+  // Start a background thread which periodically runs GCResults().
+  // This thread is automatically stopped in the destructor.
+  //
+  // Must be called at most once.
+  void StartGCThread();
+
   // Runs time-based garbage collection on the results this result tracker is caching.
   // When garbage collection runs, it goes through all ClientStates and:
   // - If a ClientState is older than the 'remember_clients_ttl_ms' flag and no
@@ -230,6 +236,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
   //   through all CompletionRecords and:
   //   - If the CompletionRecord is older than the 'remember_responses_ttl_secs' flag,
   //     GCs the CompletionRecord and advances the 'stale_before_seq_no' watermark.
+  //
+  // Typically this is invoked from an internal thread started by 'StartGCThread()'.
   void GCResults();
 
   string ToString();
@@ -361,6 +369,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
 
   std::string ToStringUnlocked() const;
 
+  void RunGCThread();
+
   // The memory tracker that tracks this ResultTracker's memory consumption.
   std::shared_ptr<kudu::MemTracker> mem_tracker_;
 
@@ -378,6 +388,10 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
 
   ClientStateMap clients_;
 
+  // The thread which runs GC, and a latch to stop it.
+  scoped_refptr<Thread> gc_thread_;
+  CountDownLatch gc_thread_stop_latch_;
+
   DISALLOW_COPY_AND_ASSIGN(ResultTracker);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index f3863f2..d64647b 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -29,7 +29,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/util/flag_tags.h"
 
-// TODO remove this once we have ResultTracker GC
+// TODO remove this once we have fully cluster-tested this.
 DEFINE_bool(enable_exactly_once, false, "Whether to enable exactly once semantics on the client "
     "(experimental).");
 TAG_FLAG(enable_exactly_once, experimental);

http://git-wip-us.apache.org/repos/asf/kudu/blob/dccca07c/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 6010f23..6d84393 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -185,6 +185,8 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK_PREPEND(StartMetricsLogging(), "Could not enable metrics logging");
 
+  result_tracker_->StartGCThread();
+
   return Status::OK();
 }