You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/30 23:48:54 UTC
[1/2] kudu git commit: [monotime] deprecating MonoTime::Earliest()
Repository: kudu
Updated Branches:
refs/heads/master 794285592 -> 1070e764d
[monotime] deprecating MonoTime::Earliest()
Deprecating MonoTime::Earliest() in favor of std::min().
This patch does not contain any functional changes.
Change-Id: Ib7a4fd7881834db632147ebfa1afc138d6432e26
Reviewed-on: http://gerrit.cloudera.org:8080/7347
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/26749aff
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/26749aff
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/26749aff
Branch: refs/heads/master
Commit: 26749aff06e1ef27e58f1ec81db3b097ecdb34d9
Parents: 7942855
Author: Alexey Serbin <as...@cloudera.com>
Authored: Wed Jun 28 23:00:05 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Jun 30 22:01:20 2017 +0000
----------------------------------------------------------------------
src/kudu/client/client-internal.cc | 4 ++--
src/kudu/client/master_rpc.cc | 6 +++---
src/kudu/client/meta_cache.cc | 3 ++-
src/kudu/client/scanner-internal.cc | 5 ++---
src/kudu/master/catalog_manager.cc | 2 +-
src/kudu/tablet/tablet_mm_ops-test.cc | 2 +-
src/kudu/util/monotime.h | 5 ++++-
7 files changed, 15 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 6a61adc..f72b8de 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -170,9 +170,9 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
// deadline so that we reserve some time with which to find a new
// leader master and retry before the overall deadline expires.
//
- // TODO: KUDU-683 tracks cleanup for this.
+ // TODO(KUDU-683): cleanup this up
MonoTime rpc_deadline = now + client->default_rpc_timeout();
- rpc.set_deadline(MonoTime::Earliest(rpc_deadline, deadline));
+ rpc.set_deadline(std::min(rpc_deadline, deadline));
for (uint32_t required_feature_flag : required_feature_flags) {
rpc.RequireServerFeature(required_feature_flag);
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/client/master_rpc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/master_rpc.cc b/src/kudu/client/master_rpc.cc
index e78c349..93fde48 100644
--- a/src/kudu/client/master_rpc.cc
+++ b/src/kudu/client/master_rpc.cc
@@ -19,6 +19,7 @@
#include "kudu/client/master_rpc.h"
+#include <algorithm>
#include <mutex>
#include <boost/bind.hpp>
@@ -230,9 +231,8 @@ string ConnectToClusterRpc::ToString() const {
void ConnectToClusterRpc::SendRpc() {
// Compute the actual deadline to use for each RPC.
- MonoTime rpc_deadline = MonoTime::Now() + rpc_timeout_;
- MonoTime actual_deadline = MonoTime::Earliest(retrier().deadline(),
- rpc_deadline);
+ const MonoTime rpc_deadline = MonoTime::Now() + rpc_timeout_;
+ const MonoTime actual_deadline = std::min(retrier().deadline(), rpc_deadline);
std::lock_guard<simple_spinlock> l(lock_);
for (int i = 0; i < addrs_.size(); i++) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/client/meta_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index f7a11cd..5c57356 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -17,6 +17,7 @@
#include "kudu/client/meta_cache.h"
+#include <algorithm>
#include <map>
#include <mutex>
#include <set>
@@ -664,7 +665,7 @@ void LookupRpc::SendRpc() {
}
MonoTime rpc_deadline = now + meta_cache_->client_->default_rpc_timeout();
mutable_retrier()->mutable_controller()->set_deadline(
- MonoTime::Earliest(rpc_deadline, retrier().deadline()));
+ std::min(rpc_deadline, retrier().deadline()));
master_proxy()->GetTableLocationsAsync(req_, &resp_,
mutable_retrier()->mutable_controller(),
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index bcaf994..8dfb9f0 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -18,7 +18,6 @@
#include "kudu/client/scanner-internal.h"
#include <algorithm>
-#include <boost/bind.hpp>
#include <cmath>
#include <string>
#include <vector>
@@ -36,9 +35,9 @@
using google::protobuf::FieldDescriptor;
using google::protobuf::Reflection;
-
using std::set;
using std::string;
+using std::vector;
namespace kudu {
@@ -263,7 +262,7 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
MonoTime rpc_deadline;
if (allow_time_for_failover) {
rpc_deadline = MonoTime::Now() + table_->client()->default_rpc_timeout();
- rpc_deadline = MonoTime::Earliest(overall_deadline, rpc_deadline);
+ rpc_deadline = std::min(overall_deadline, rpc_deadline);
} else {
rpc_deadline = overall_deadline;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6a1a471..5ea25e8 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2869,7 +2869,7 @@ Status RetryingTSRpcTask::Run() {
// Calculate and set the timeout deadline.
MonoTime timeout = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
- const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
+ const MonoTime& deadline = std::min(timeout, deadline_);
rpc_.Reset();
rpc_.set_deadline(deadline);
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/tablet/tablet_mm_ops-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc
index 7422230..442d9fd 100644
--- a/src/kudu/tablet/tablet_mm_ops-test.cc
+++ b/src/kudu/tablet/tablet_mm_ops-test.cc
@@ -55,7 +55,7 @@ class KuduTabletMmOpsTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
void StatsShouldNotChange(MaintenanceOp* op) {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
- ASSERT_TRUE(next_time_.Equals(stats_.last_modified()));
+ ASSERT_EQ(next_time_, stats_.last_modified());
next_time_ = stats_.last_modified();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/26749aff/src/kudu/util/monotime.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/monotime.h b/src/kudu/util/monotime.h
index 007c54d..f261a99 100644
--- a/src/kudu/util/monotime.h
+++ b/src/kudu/util/monotime.h
@@ -22,6 +22,8 @@
#ifdef KUDU_HEADERS_NO_STUBS
#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/port.h"
#else
// This is a poor module interdependency, but the stubs are header-only and
// it's only for exported header builds, so we'll make an exception.
@@ -173,7 +175,8 @@ class KUDU_EXPORT MonoTime {
/// @param [in] b
/// The second MonoTime object to select from.
/// @return The earliest (minimum) of the two monotimes.
- static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b);
+ static const MonoTime& Earliest(const MonoTime& a, const MonoTime& b)
+ ATTRIBUTE_DEPRECATED("use std::min() instead");
/// Build a MonoTime object. The resulting object is not initialized
/// and not ready to use.
[2/2] kudu git commit: consensus_peers: schedule raft heartbeats
through messenger
Posted by to...@apache.org.
consensus_peers: schedule raft heartbeats through messenger
This patch switches Peer to a Messenger-based approach (i.e.
Messenger::ScheduleOnReactor) for scheduling heartbeats. The problem with
ResettableHeartbeater was that it creates a dedicated thread per replica,
and I couldn't see an easy way to share it across replicas.
The scheduling is inspired by libev's "Let the timer time out, but then
re-arm it as required" approach[1]. Callbacks are never canceled since the
Messenger doesn't support that. Instead, the next heartbeat time is
maintained as out-of-band state, and we ensure that the callback period is
low enough that it can honor any heartbeat time.
1. http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#code_ev_timer_code_relative_and_opti
Change-Id: Iac8e09fe02dd32885ef0cf644cb093b1c8e6afb8
Reviewed-on: http://gerrit.cloudera.org:8080/7331
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1070e764
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1070e764
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1070e764
Branch: refs/heads/master
Commit: 1070e764d97ce496325a277b9093f0ab3b9ee167
Parents: 26749af
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jun 28 14:27:31 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Jun 30 23:28:49 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus-test-util.h | 28 +++-
src/kudu/consensus/consensus_peers-test.cc | 13 +-
src/kudu/consensus/consensus_peers.cc | 129 +++++++++++++++----
src/kudu/consensus/consensus_peers.h | 58 ++++++---
src/kudu/consensus/leader_election-test.cc | 13 ++
src/kudu/consensus/peer_manager.cc | 1 +
.../consensus/raft_consensus_quorum-test.cc | 12 +-
.../tserver/tablet_copy_source_session-test.cc | 2 +-
8 files changed, 199 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 20cc0a9..de3e4ea 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include <boost/bind.hpp>
-#include <gmock/gmock.h>
#include <map>
#include <memory>
#include <mutex>
@@ -25,6 +23,9 @@
#include <utility>
#include <vector>
+#include <boost/bind.hpp>
+#include <gmock/gmock.h>
+
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_peers.h"
@@ -33,6 +34,7 @@
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/server/clock.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
@@ -350,15 +352,21 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
public:
NoOpTestPeerProxyFactory() {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
+ CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
}
- virtual Status NewProxy(const consensus::RaftPeerPB& peer_pb,
- gscoped_ptr<PeerProxy>* proxy) OVERRIDE {
+ Status NewProxy(const consensus::RaftPeerPB& peer_pb,
+ gscoped_ptr<PeerProxy>* proxy) override {
proxy->reset(new NoOpTestPeerProxy(pool_.get(), peer_pb));
return Status::OK();
}
+ const std::shared_ptr<rpc::Messenger>& messenger() const override {
+ return messenger_;
+ }
+ private:
gscoped_ptr<ThreadPool> pool_;
+ std::shared_ptr<rpc::Messenger> messenger_;
};
typedef std::unordered_map<std::string, scoped_refptr<RaftConsensus> > TestPeerMap;
@@ -556,10 +564,11 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
: peers_(peers) {
CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
+ CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
}
- virtual Status NewProxy(const consensus::RaftPeerPB& peer_pb,
- gscoped_ptr<PeerProxy>* proxy) OVERRIDE {
+ Status NewProxy(const consensus::RaftPeerPB& peer_pb,
+ gscoped_ptr<PeerProxy>* proxy) override {
LocalTestPeerProxy* new_proxy = new LocalTestPeerProxy(peer_pb.permanent_uuid(),
pool_.get(),
peers_);
@@ -568,12 +577,17 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
return Status::OK();
}
- virtual const vector<LocalTestPeerProxy*>& GetProxies() {
+ const vector<LocalTestPeerProxy*>& GetProxies() {
return proxies_;
}
+ const std::shared_ptr<rpc::Messenger>& messenger() const override {
+ return messenger_;
+ }
+
private:
gscoped_ptr<ThreadPool> pool_;
+ std::shared_ptr<rpc::Messenger> messenger_;
TestPeerMapManager* const peers_;
// NOTE: There is no need to delete this on the dctor because proxies are externally managed
vector<LocalTestPeerProxy*> proxies_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 1307b5b..2ff9f41 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -28,6 +28,7 @@
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_macros.h"
@@ -41,6 +42,8 @@ namespace consensus {
using log::Log;
using log::LogOptions;
+using rpc::Messenger;
+using rpc::MessengerBuilder;
using std::shared_ptr;
using std::unique_ptr;
@@ -81,10 +84,14 @@ class ConsensusPeersTest : public KuduTest {
FakeRaftPeerPB(kLeaderUuid),
kTabletId,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+
+ MessengerBuilder bld("test");
+ ASSERT_OK(bld.Build(&messenger_));
}
virtual void TearDown() OVERRIDE {
ASSERT_OK(log_->WaitUntilAllFlushed());
+ messenger_->Shutdown();
}
DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
@@ -95,12 +102,13 @@ class ConsensusPeersTest : public KuduTest {
auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
gscoped_ptr<PeerProxy> proxy(proxy_ptr);
- CHECK_OK(Peer::NewRemotePeer(peer_pb,
+ CHECK_OK(Peer::NewRemotePeer(std::move(peer_pb),
kTabletId,
kLeaderUuid,
message_queue_.get(),
raft_pool_token_.get(),
std::move(proxy),
+ messenger_,
peer));
return proxy_ptr;
}
@@ -139,6 +147,7 @@ class ConsensusPeersTest : public KuduTest {
LogOptions options_;
unique_ptr<ThreadPoolToken> raft_pool_token_;
scoped_refptr<server::Clock> clock_;
+ shared_ptr<Messenger> messenger_;
};
@@ -246,6 +255,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
message_queue_.get(),
raft_pool_token_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
+ messenger_,
&peer));
// Make the peer respond without making any progress -- it always returns
@@ -284,6 +294,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
message_queue_.get(),
raft_pool_token_.get(),
gscoped_ptr<PeerProxy>(mock_proxy),
+ messenger_,
&peer));
// Initial response has to be successful -- otherwise we'll consider the peer
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index d1c44b6..751621c 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -18,14 +18,15 @@
#include "kudu/consensus/consensus.proxy.h"
#include <algorithm>
-#include <boost/bind.hpp>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
+#include <functional>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/consensus_queue.h"
@@ -33,12 +34,15 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(consensus_rpc_timeout_ms, 1000,
@@ -80,45 +84,54 @@ using rpc::RpcController;
using strings::Substitute;
using tserver::TabletServerErrorPB;
-Status Peer::NewRemotePeer(const RaftPeerPB& peer_pb,
- const string& tablet_id,
- const string& leader_uuid,
+Status Peer::NewRemotePeer(RaftPeerPB peer_pb,
+ string tablet_id,
+ string leader_uuid,
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
gscoped_ptr<PeerProxy> proxy,
+ shared_ptr<Messenger> messenger,
shared_ptr<Peer>* peer) {
- shared_ptr<Peer> new_peer(new Peer(peer_pb,
- tablet_id,
- leader_uuid,
- std::move(proxy),
+ shared_ptr<Peer> new_peer(new Peer(std::move(peer_pb),
+ std::move(tablet_id),
+ std::move(leader_uuid),
queue,
- raft_pool_token));
+ raft_pool_token,
+ std::move(proxy),
+ std::move(messenger)));
RETURN_NOT_OK(new_peer->Init());
*peer = std::move(new_peer);
return Status::OK();
}
-Peer::Peer(const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid,
- gscoped_ptr<PeerProxy> proxy, PeerMessageQueue* queue,
- ThreadPoolToken* raft_pool_token)
+Peer::Peer(RaftPeerPB peer_pb,
+ string tablet_id,
+ string leader_uuid,
+ PeerMessageQueue* queue,
+ ThreadPoolToken* raft_pool_token,
+ gscoped_ptr<PeerProxy> proxy,
+ shared_ptr<Messenger> messenger)
: tablet_id_(std::move(tablet_id)),
leader_uuid_(std::move(leader_uuid)),
- peer_pb_(peer_pb),
+ peer_pb_(std::move(peer_pb)),
proxy_(std::move(proxy)),
queue_(queue),
failed_attempts_(0),
- heartbeater_(
- peer_pb.permanent_uuid(),
- MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms),
- boost::bind(&Peer::SignalRequest, this, true)),
- raft_pool_token_(raft_pool_token) {
+ messenger_(std::move(messenger)),
+ raft_pool_token_(raft_pool_token),
+ rng_(GetRandomSeed32()) {
}
Status Peer::Init() {
- std::lock_guard<simple_spinlock> lock(peer_lock_);
- queue_->TrackPeer(peer_pb_.permanent_uuid());
- RETURN_NOT_OK(heartbeater_.Start());
+ {
+ std::lock_guard<simple_spinlock> l(peer_lock_);
+ queue_->TrackPeer(peer_pb_.permanent_uuid());
+ UpdateNextHeartbeatTimeUnlocked();
+ }
+
+ // Schedule the first heartbeat.
+ ScheduleNextHeartbeatAndMaybeSignalRequest(Status::OK());
return Status::OK();
}
@@ -217,10 +230,9 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
return;
}
- // If we're actually sending ops there's no need to heartbeat for a while,
- // reset the heartbeater
if (req_has_ops) {
- heartbeater_.Reset();
+ // If we're actually sending ops there's no need to heartbeat for a while.
+ UpdateNextHeartbeatTimeUnlocked();
}
MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
@@ -379,8 +391,6 @@ string Peer::LogPrefixUnlocked() const {
}
void Peer::Close() {
- WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater");
-
// If the peer is already closed return.
{
std::lock_guard<simple_spinlock> lock(peer_lock_);
@@ -398,6 +408,69 @@ Peer::~Peer() {
request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
}
+void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status) {
+ if (!status.ok()) {
+ // The reactor was shut down.
+ return;
+ }
+
+ // We must schedule the next callback with the lowest possible jittered
+ // time as the delay so that if SendNextRequest() resets the next heartbeat
+ // time with a very low value, the scheduled callback can still honor it.
+ //
+ // In effect, this means the callback period and the heartbeat period are
+ // decoupled from one another.
+ MonoDelta schedule_delay = GetHeartbeatJitterLowerBound();
+ bool signal_now = false;
+ {
+ std::lock_guard<simple_spinlock> l(peer_lock_);
+ if (closed_) {
+ // The peer was closed.
+ return;
+ }
+
+ MonoTime now = MonoTime::Now();
+ if (now < next_hb_time_) {
+ // It's not yet time to heartbeat. Reduce the scheduled delay if enough
+ // time has elapsed, but don't increase it.
+ schedule_delay = std::min(schedule_delay, next_hb_time_ - now);
+ } else {
+ // It's time to heartbeat. Although the next heartbeat time is set now,
+ // it may be set again when we get to SendNextRequest().
+ signal_now = true;
+ UpdateNextHeartbeatTimeUnlocked();
+ }
+ }
+
+ if (signal_now) {
+ SignalRequest(true);
+ }
+
+ // Capture a shared_ptr reference into the submitted functor so that we're
+ // guaranteed that this object outlives the functor.
+ //
+ // Note: we use std::bind and not boost::bind here because the latter doesn't
+ // work with std::shared_ptr.
+ shared_ptr<Peer> s_this = shared_from_this();
+ messenger_->ScheduleOnReactor(
+ std::bind(&Peer::ScheduleNextHeartbeatAndMaybeSignalRequest, s_this,
+ std::placeholders::_1), schedule_delay);
+}
+
+MonoDelta Peer::GetHeartbeatJitterLowerBound() {
+ return MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms / 2);
+}
+
+void Peer::UpdateNextHeartbeatTimeUnlocked() {
+ DCHECK(peer_lock_.is_locked());
+
+ // We randomize the next heartbeat time between period/2 and period so
+ // multiple tablets on the same TS don't end up heartbeating in lockstep.
+ int64_t half_period_ms = GetHeartbeatJitterLowerBound().ToMilliseconds();
+ int64_t additional_jitter_ms = rng_.NextDoubleFraction() * half_period_ms;
+ next_hb_time_ = MonoTime::Now() + MonoDelta::FromMilliseconds(
+ half_period_ms + additional_jitter_ms);
+}
RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
gscoped_ptr<ConsensusServiceProxy> consensus_proxy)
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 016bad4..3dfc072 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -29,7 +29,7 @@
#include "kudu/rpc/rpc_controller.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
-#include "kudu/util/resettable_heartbeater.h"
+#include "kudu/util/random.h"
#include "kudu/util/semaphore.h"
#include "kudu/util/status.h"
@@ -105,18 +105,26 @@ class Peer : public std::enable_shared_from_this<Peer> {
// log entries) are assembled on 'raft_pool_token'.
// Response handling may also involve IO related to log-entry lookups and is
// also done on 'raft_pool_token'.
- static Status NewRemotePeer(const RaftPeerPB& peer_pb,
- const std::string& tablet_id,
- const std::string& leader_uuid,
+ static Status NewRemotePeer(RaftPeerPB peer_pb,
+ std::string tablet_id,
+ std::string leader_uuid,
PeerMessageQueue* queue,
ThreadPoolToken* raft_pool_token,
gscoped_ptr<PeerProxy> proxy,
+ std::shared_ptr<rpc::Messenger> messenger,
std::shared_ptr<Peer>* peer);
private:
- Peer(const RaftPeerPB& peer_pb, std::string tablet_id, std::string leader_uuid,
- gscoped_ptr<PeerProxy> proxy, PeerMessageQueue* queue,
- ThreadPoolToken* raft_pool_token);
+ // Returns the lowest possible jitter-induced heartbeat time delay.
+ static MonoDelta GetHeartbeatJitterLowerBound();
+
+ Peer(RaftPeerPB peer_pb,
+ std::string tablet_id,
+ std::string leader_uuid,
+ PeerMessageQueue* queue,
+ ThreadPoolToken* raft_pool_token,
+ gscoped_ptr<PeerProxy> proxy,
+ std::shared_ptr<rpc::Messenger> messenger);
void SendNextRequest(bool even_if_queue_empty);
@@ -145,6 +153,17 @@ class Peer : public std::enable_shared_from_this<Peer> {
std::string LogPrefixUnlocked() const;
+ // Schedules the next heartbeat for this peer, optionally sending a heartbeat
+ // now if it makes sense to do so. Initially called from Init() to schedule
+ // the first heartbeat, and subsequently as a callback running on a reactor thread.
+ void ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status);
+
+ // Resets the next time that we should heartbeat to this peer. Does not
+ // perform any actual scheduling.
+ //
+ // Must be called with 'peer_lock_' held.
+ void UpdateNextHeartbeatTimeUnlocked();
+
const std::string& tablet_id() const { return tablet_id_; }
const std::string tablet_id_;
@@ -173,17 +192,19 @@ class Peer : public std::enable_shared_from_this<Peer> {
rpc::RpcController controller_;
- // Heartbeater for remote peer implementations.
- // This will send status only requests to the remote peers
- // whenever we go more than 'FLAGS_raft_heartbeat_interval_ms'
- // without sending actual data.
- ResettableHeartbeater heartbeater_;
+ std::shared_ptr<rpc::Messenger> messenger_;
// Thread pool token used to construct requests to this peer.
//
// RaftConsensus owns this shared token and is responsible for destroying it.
ThreadPoolToken* raft_pool_token_;
+ // The next time that a heartbeat should be sent to this peer.
+ MonoTime next_hb_time_;
+
+ // A PRNG, used to generate jittered heartbeat time.
+ Random rng_;
+
// lock that protects Peer state changes, initialization, etc.
mutable simple_spinlock peer_lock_;
bool request_pending_ = false;
@@ -229,6 +250,8 @@ class PeerProxyFactory {
gscoped_ptr<PeerProxy>* proxy) = 0;
virtual ~PeerProxyFactory() {}
+
+ virtual const std::shared_ptr<rpc::Messenger>& messenger() const = 0;
};
// PeerProxy implementation that does RPC calls
@@ -264,10 +287,15 @@ class RpcPeerProxyFactory : public PeerProxyFactory {
public:
explicit RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger);
- virtual Status NewProxy(const RaftPeerPB& peer_pb,
- gscoped_ptr<PeerProxy>* proxy) OVERRIDE;
+ Status NewProxy(const RaftPeerPB& peer_pb,
+ gscoped_ptr<PeerProxy>* proxy) override;
+
+ ~RpcPeerProxyFactory();
+
+ const std::shared_ptr<rpc::Messenger>& messenger() const override {
+ return messenger_;
+ }
- virtual ~RpcPeerProxyFactory();
private:
std::shared_ptr<rpc::Messenger> messenger_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/leader_election-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index e523af0..915344a 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -17,6 +17,7 @@
#include "kudu/consensus/leader_election.h"
+#include <memory>
#include <string>
#include <vector>
@@ -30,8 +31,14 @@
#include "kudu/util/test_util.h"
namespace kudu {
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
namespace consensus {
+using std::shared_ptr;
using std::string;
using std::unordered_map;
using std::vector;
@@ -73,10 +80,16 @@ class FromMapPeerProxyFactory : public PeerProxyFactory {
return Status::OK();
}
+ const shared_ptr<rpc::Messenger>& messenger() const override {
+ return null_messenger_;
+ }
+
private:
// FYI, the tests may add and remove nodes from this map while we hold a
// reference to it.
const ProxyMap* const proxy_map_;
+
+ shared_ptr<rpc::Messenger> null_messenger_;
};
class LeaderElectionTest : public KuduTest {
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/peer_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 37aebcd..322d59c 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -77,6 +77,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
queue_,
raft_pool_token_,
std::move(peer_proxy),
+ peer_proxy_factory_->messenger(),
&remote_peer));
peers_.emplace(peer_pb.permanent_uuid(), std::move(remote_peer));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 3cf1d2b..f87de28 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -35,7 +35,6 @@
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/consensus/log_reader.h"
-#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/logical_clock.h"
#include "kudu/util/auto_release_pool.h"
@@ -833,17 +832,20 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
int repl0_init_count = follower0->update_calls_for_tests();
int repl1_init_count = follower1->update_calls_for_tests();
- // Now wait for about 4 times the hearbeat period the counters
- // should have increased 3/4 times.
+ // Now wait for about 4 times the heartbeat period; the counters
+ // should have increased between 3 to 8 times.
+ //
+ // Why the variance? Heartbeat timing is jittered such that the period
+ // between heartbeats can be anywhere from half the interval to the full interval.
SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4LL));
int repl0_final_count = follower0->update_calls_for_tests();
int repl1_final_count = follower1->update_calls_for_tests();
ASSERT_GE(repl0_final_count - repl0_init_count, 3);
- ASSERT_LE(repl0_final_count - repl0_init_count, 4);
+ ASSERT_LE(repl0_final_count - repl0_init_count, 8);
ASSERT_GE(repl1_final_count - repl1_init_count, 3);
- ASSERT_LE(repl1_final_count - repl1_init_count, 4);
+ ASSERT_LE(repl1_final_count - repl1_init_count, 8);
VerifyLogs(2, 0, 1);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index f216074..c45ec36 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -33,8 +33,8 @@
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
-#include "kudu/tserver/tablet_copy_source_session.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tablet_copy_source_session.h"
#include "kudu/util/crc.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"