You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/13 07:14:17 UTC

[3/3] kudu git commit: [client] avoid circular deps in time-based flusher

[client] avoid circular deps in time-based flusher

The boost::bind() makes cast of parameters during the call,
not during creation of the functor object:
  http://stackoverflow.com/questions/11255144/why-does-boostbind-store-arguments-of-the-type-passed-in-rather-than-of-the-ty

So, it's necessary to pass weak pointers to the background auto-flush
task to avoid circular dependencies between client::KuduSession::Data
and rpc::Messenger.  Besides, it does not make much sense to store
shared reference to messenger in KuduSession::Data since it's always
passed as a weak reference and then promoting to a shared one during
the call in all usage scenarios.

Thanks to Adar and Todd spotting the usual suspect there.

This is a follow-up for 93be1310d227cf05025864654ca3f6713c2ddc2c.

Change-Id: I59825981a600f5882ee476479c2ddf16b495c1f9
Reviewed-on: http://gerrit.cloudera.org:8080/4395
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Tested-by: Todd Lipcon <to...@apache.org>
(cherry picked from commit 1a062253e3fdc900a4b0b418520d2870b6de8846)
Reviewed-on: http://gerrit.cloudera.org:8080/4403
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6f6e49ca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6f6e49ca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6f6e49ca

Branch: refs/heads/branch-1.0.x
Commit: 6f6e49ca98c3e3be7d81f88ab8a0f9173959b191
Parents: b866f40
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Sep 12 18:03:27 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Sep 13 07:13:21 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/session-internal.cc | 20 ++++++++++----------
 src/kudu/client/session-internal.h  | 10 +++++-----
 2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6f6e49ca/src/kudu/client/session-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc
index 8da7679..661288d 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -40,7 +40,7 @@ using sp::weak_ptr;
 
 
 KuduSession::Data::Data(shared_ptr<KuduClient> client,
-                        std::shared_ptr<rpc::Messenger> messenger)
+                        std::weak_ptr<rpc::Messenger> messenger)
     : client_(std::move(client)),
       messenger_(std::move(messenger)),
       error_collector_(new ErrorCollector()),
@@ -58,8 +58,8 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
       buffer_pre_flush_enabled_(true) {
 }
 
-void KuduSession::Data::Init(const weak_ptr<KuduSession>& session) {
-  TimeBasedFlushInit(session);
+void KuduSession::Data::Init(weak_ptr<KuduSession> session) {
+  TimeBasedFlushInit(std::move(session));
 }
 
 void KuduSession::Data::FlushFinished(Batcher* batcher) {
@@ -109,7 +109,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
 }
 
 Status KuduSession::Data::SetFlushMode(FlushMode mode,
-                                       const sp::weak_ptr<KuduSession>& session) {
+                                       sp::weak_ptr<KuduSession> session) {
   {
     std::lock_guard<Mutex> l(mutex_);
     if (HasPendingOperationsUnlocked()) {
@@ -129,7 +129,7 @@ Status KuduSession::Data::SetFlushMode(FlushMode mode,
     flush_mode_ = mode;
   }
 
-  TimeBasedFlushInit(session);
+  TimeBasedFlushInit(std::move(session));
 
   return Status::OK();
 }
@@ -320,7 +320,7 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) {
 // batcher (success path) or in the error collector (failure path). Otherwise
 // it would be a memory leak.
 Status KuduSession::Data::ApplyWriteOp(
-    const sp::weak_ptr<KuduSession>& weak_session,
+    sp::weak_ptr<KuduSession> weak_session,
     KuduWriteOperation* write_op) {
 
   if (!write_op) {
@@ -429,7 +429,7 @@ Status KuduSession::Data::ApplyWriteOp(
       // of control since no thread-safety is advertised
       // for the kudu::KuduSession interface.
       scoped_refptr<Batcher> batcher(
-          new Batcher(client_.get(), error_collector_, weak_session,
+          new Batcher(client_.get(), error_collector_, std::move(weak_session),
                       external_consistency_mode_));
       if (timeout_ms_ != -1) {
         batcher->SetTimeoutMillis(timeout_ms_);
@@ -462,7 +462,7 @@ Status KuduSession::Data::ApplyWriteOp(
 void KuduSession::Data::TimeBasedFlushInit(
     sp::weak_ptr<KuduSession> weak_session) {
   KuduSession::Data::TimeBasedFlushTask(
-      Status::OK(), messenger_, weak_session, true);
+      Status::OK(), messenger_, std::move(weak_session), true);
 }
 
 void KuduSession::Data::TimeBasedFlushTask(
@@ -513,8 +513,8 @@ void KuduSession::Data::TimeBasedFlushTask(
   std::shared_ptr<rpc::Messenger> messenger(weak_messenger.lock());
   if (PREDICT_TRUE(messenger)) {
     messenger->ScheduleOnReactor(
-        boost::bind(&KuduSession::Data::TimeBasedFlushTask,
-                    _1, messenger, session, false),
+        boost::bind(&KuduSession::Data::TimeBasedFlushTask, _1,
+                    std::move(weak_messenger), std::move(weak_session), false),
         next_run);
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/6f6e49ca/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h
index e050301..fe2225c 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -55,9 +55,9 @@ class ErrorCollector;
 class KuduSession::Data {
  public:
   explicit Data(sp::shared_ptr<KuduClient> client,
-                std::shared_ptr<rpc::Messenger> messenger);
+                std::weak_ptr<rpc::Messenger> messenger);
 
-  void Init(const sp::weak_ptr<KuduSession>& session);
+  void Init(sp::weak_ptr<KuduSession> session);
 
   // Called by Batcher when a flush has finished.
   void FlushFinished(internal::Batcher* b);
@@ -68,7 +68,7 @@ class KuduSession::Data {
   Status Close(bool force);
 
   // Set flush mode for the session.
-  Status SetFlushMode(FlushMode mode, const sp::weak_ptr<KuduSession>& session);
+  Status SetFlushMode(FlushMode mode, sp::weak_ptr<KuduSession> session);
 
   // Set external consistency mode for the session.
   Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
@@ -123,7 +123,7 @@ class KuduSession::Data {
   MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
 
   // Apply a write operation, i.e. push it through the batcher chain.
-  Status ApplyWriteOp(const sp::weak_ptr<KuduSession>& session,
+  Status ApplyWriteOp(sp::weak_ptr<KuduSession> session,
                       KuduWriteOperation* write_op);
 
   // Check and start the time-based flush task in background, if necessary.
@@ -160,7 +160,7 @@ class KuduSession::Data {
 
   // The reference to the client's messenger (keeping the reference instead of
   // declaring friendship to KuduClient and accessing it via the client_).
-  std::shared_ptr<rpc::Messenger> messenger_;
+  std::weak_ptr<rpc::Messenger> messenger_;
 
   // Buffer for errors.
   scoped_refptr<internal::ErrorCollector> error_collector_;