You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/13 07:14:17 UTC
[3/3] kudu git commit: [client] avoid circular deps in time-based
flusher
[client] avoid circular deps in time-based flusher
The boost::bind() makes cast of parameters during the call,
not during creation of the functor object:
http://stackoverflow.com/questions/11255144/why-does-boostbind-store-arguments-of-the-type-passed-in-rather-than-of-the-ty
So, it's necessary to pass weak pointers to the background auto-flush
task to avoid circular dependencies between client::KuduSession::Data
and rpc::Messenger. Besides, it does not make much sense to store
shared reference to messenger in KuduSession::Data since it's always
passed as a weak reference and then promoting to a shared one during
the call in all usage scenarios.
Thanks to Adar and Todd spotting the usual suspect there.
This is a follow-up for 93be1310d227cf05025864654ca3f6713c2ddc2c.
Change-Id: I59825981a600f5882ee476479c2ddf16b495c1f9
Reviewed-on: http://gerrit.cloudera.org:8080/4395
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Tested-by: Todd Lipcon <to...@apache.org>
(cherry picked from commit 1a062253e3fdc900a4b0b418520d2870b6de8846)
Reviewed-on: http://gerrit.cloudera.org:8080/4403
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6f6e49ca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6f6e49ca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6f6e49ca
Branch: refs/heads/branch-1.0.x
Commit: 6f6e49ca98c3e3be7d81f88ab8a0f9173959b191
Parents: b866f40
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Sep 12 18:03:27 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Sep 13 07:13:21 2016 +0000
----------------------------------------------------------------------
src/kudu/client/session-internal.cc | 20 ++++++++++----------
src/kudu/client/session-internal.h | 10 +++++-----
2 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/6f6e49ca/src/kudu/client/session-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.cc b/src/kudu/client/session-internal.cc
index 8da7679..661288d 100644
--- a/src/kudu/client/session-internal.cc
+++ b/src/kudu/client/session-internal.cc
@@ -40,7 +40,7 @@ using sp::weak_ptr;
KuduSession::Data::Data(shared_ptr<KuduClient> client,
- std::shared_ptr<rpc::Messenger> messenger)
+ std::weak_ptr<rpc::Messenger> messenger)
: client_(std::move(client)),
messenger_(std::move(messenger)),
error_collector_(new ErrorCollector()),
@@ -58,8 +58,8 @@ KuduSession::Data::Data(shared_ptr<KuduClient> client,
buffer_pre_flush_enabled_(true) {
}
-void KuduSession::Data::Init(const weak_ptr<KuduSession>& session) {
- TimeBasedFlushInit(session);
+void KuduSession::Data::Init(weak_ptr<KuduSession> session) {
+ TimeBasedFlushInit(std::move(session));
}
void KuduSession::Data::FlushFinished(Batcher* batcher) {
@@ -109,7 +109,7 @@ Status KuduSession::Data::SetExternalConsistencyMode(
}
Status KuduSession::Data::SetFlushMode(FlushMode mode,
- const sp::weak_ptr<KuduSession>& session) {
+ sp::weak_ptr<KuduSession> session) {
{
std::lock_guard<Mutex> l(mutex_);
if (HasPendingOperationsUnlocked()) {
@@ -129,7 +129,7 @@ Status KuduSession::Data::SetFlushMode(FlushMode mode,
flush_mode_ = mode;
}
- TimeBasedFlushInit(session);
+ TimeBasedFlushInit(std::move(session));
return Status::OK();
}
@@ -320,7 +320,7 @@ MonoDelta KuduSession::Data::FlushCurrentBatcher(const MonoDelta& max_age) {
// batcher (success path) or in the error collector (failure path). Otherwise
// it would be a memory leak.
Status KuduSession::Data::ApplyWriteOp(
- const sp::weak_ptr<KuduSession>& weak_session,
+ sp::weak_ptr<KuduSession> weak_session,
KuduWriteOperation* write_op) {
if (!write_op) {
@@ -429,7 +429,7 @@ Status KuduSession::Data::ApplyWriteOp(
// of control since no thread-safety is advertised
// for the kudu::KuduSession interface.
scoped_refptr<Batcher> batcher(
- new Batcher(client_.get(), error_collector_, weak_session,
+ new Batcher(client_.get(), error_collector_, std::move(weak_session),
external_consistency_mode_));
if (timeout_ms_ != -1) {
batcher->SetTimeoutMillis(timeout_ms_);
@@ -462,7 +462,7 @@ Status KuduSession::Data::ApplyWriteOp(
void KuduSession::Data::TimeBasedFlushInit(
sp::weak_ptr<KuduSession> weak_session) {
KuduSession::Data::TimeBasedFlushTask(
- Status::OK(), messenger_, weak_session, true);
+ Status::OK(), messenger_, std::move(weak_session), true);
}
void KuduSession::Data::TimeBasedFlushTask(
@@ -513,8 +513,8 @@ void KuduSession::Data::TimeBasedFlushTask(
std::shared_ptr<rpc::Messenger> messenger(weak_messenger.lock());
if (PREDICT_TRUE(messenger)) {
messenger->ScheduleOnReactor(
- boost::bind(&KuduSession::Data::TimeBasedFlushTask,
- _1, messenger, session, false),
+ boost::bind(&KuduSession::Data::TimeBasedFlushTask, _1,
+ std::move(weak_messenger), std::move(weak_session), false),
next_run);
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/6f6e49ca/src/kudu/client/session-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/session-internal.h b/src/kudu/client/session-internal.h
index e050301..fe2225c 100644
--- a/src/kudu/client/session-internal.h
+++ b/src/kudu/client/session-internal.h
@@ -55,9 +55,9 @@ class ErrorCollector;
class KuduSession::Data {
public:
explicit Data(sp::shared_ptr<KuduClient> client,
- std::shared_ptr<rpc::Messenger> messenger);
+ std::weak_ptr<rpc::Messenger> messenger);
- void Init(const sp::weak_ptr<KuduSession>& session);
+ void Init(sp::weak_ptr<KuduSession> session);
// Called by Batcher when a flush has finished.
void FlushFinished(internal::Batcher* b);
@@ -68,7 +68,7 @@ class KuduSession::Data {
Status Close(bool force);
// Set flush mode for the session.
- Status SetFlushMode(FlushMode mode, const sp::weak_ptr<KuduSession>& session);
+ Status SetFlushMode(FlushMode mode, sp::weak_ptr<KuduSession> session);
// Set external consistency mode for the session.
Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
@@ -123,7 +123,7 @@ class KuduSession::Data {
MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
// Apply a write operation, i.e. push it through the batcher chain.
- Status ApplyWriteOp(const sp::weak_ptr<KuduSession>& session,
+ Status ApplyWriteOp(sp::weak_ptr<KuduSession> session,
KuduWriteOperation* write_op);
// Check and start the time-based flush task in background, if necessary.
@@ -160,7 +160,7 @@ class KuduSession::Data {
// The reference to the client's messenger (keeping the reference instead of
// declaring friendship to KuduClient and accessing it via the client_).
- std::shared_ptr<rpc::Messenger> messenger_;
+ std::weak_ptr<rpc::Messenger> messenger_;
// Buffer for errors.
scoped_refptr<internal::ErrorCollector> error_collector_;