You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/30 23:48:55 UTC

[2/2] kudu git commit: consensus_peers: schedule raft heartbeats through messenger

consensus_peers: schedule raft heartbeats through messenger

This patch switches Peer to a Messenger-based approach (i.e.
Messenger::ScheduleOnReactor) for scheduling heartbeats. The problem with
ResettableHeartbeater was that it creates a dedicated thread per replica,
and I couldn't see an easy way to share it across replicas.

The scheduling is inspired by libev's "Let the timer time out, but then
re-arm it as required" approach[1]. Callbacks are never canceled since the
Messenger doesn't support that. Instead, the next heartbeat time is
maintained as out-of-band state, and we ensure that the callback period is
low enough that it can honor any heartbeat time.

1. http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#code_ev_timer_code_relative_and_opti

Change-Id: Iac8e09fe02dd32885ef0cf644cb093b1c8e6afb8
Reviewed-on: http://gerrit.cloudera.org:8080/7331
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1070e764
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1070e764
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1070e764

Branch: refs/heads/master
Commit: 1070e764d97ce496325a277b9093f0ab3b9ee167
Parents: 26749af
Author: Adar Dembo <ad...@cloudera.com>
Authored: Wed Jun 28 14:27:31 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Jun 30 23:28:49 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  28 +++-
 src/kudu/consensus/consensus_peers-test.cc      |  13 +-
 src/kudu/consensus/consensus_peers.cc           | 129 +++++++++++++++----
 src/kudu/consensus/consensus_peers.h            |  58 ++++++---
 src/kudu/consensus/leader_election-test.cc      |  13 ++
 src/kudu/consensus/peer_manager.cc              |   1 +
 .../consensus/raft_consensus_quorum-test.cc     |  12 +-
 .../tserver/tablet_copy_source_session-test.cc  |   2 +-
 8 files changed, 199 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 20cc0a9..de3e4ea 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <boost/bind.hpp>
-#include <gmock/gmock.h>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -25,6 +23,9 @@
 #include <utility>
 #include <vector>
 
+#include <boost/bind.hpp>
+#include <gmock/gmock.h>
+
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_peers.h"
@@ -33,6 +34,7 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/server/clock.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
@@ -350,15 +352,21 @@ class NoOpTestPeerProxyFactory : public PeerProxyFactory {
  public:
   NoOpTestPeerProxyFactory() {
     CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
+    CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
   }
 
-  virtual Status NewProxy(const consensus::RaftPeerPB& peer_pb,
-                          gscoped_ptr<PeerProxy>* proxy) OVERRIDE {
+  Status NewProxy(const consensus::RaftPeerPB& peer_pb,
+                  gscoped_ptr<PeerProxy>* proxy) override {
     proxy->reset(new NoOpTestPeerProxy(pool_.get(), peer_pb));
     return Status::OK();
   }
 
+  const std::shared_ptr<rpc::Messenger>& messenger() const override {
+    return messenger_;
+  }
+ private:
   gscoped_ptr<ThreadPool> pool_;
+  std::shared_ptr<rpc::Messenger> messenger_;
 };
 
 typedef std::unordered_map<std::string, scoped_refptr<RaftConsensus> > TestPeerMap;
@@ -556,10 +564,11 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
   explicit LocalTestPeerProxyFactory(TestPeerMapManager* peers)
     : peers_(peers) {
     CHECK_OK(ThreadPoolBuilder("test-peer-pool").set_max_threads(3).Build(&pool_));
+    CHECK_OK(rpc::MessengerBuilder("test").Build(&messenger_));
   }
 
-  virtual Status NewProxy(const consensus::RaftPeerPB& peer_pb,
-                          gscoped_ptr<PeerProxy>* proxy) OVERRIDE {
+  Status NewProxy(const consensus::RaftPeerPB& peer_pb,
+                  gscoped_ptr<PeerProxy>* proxy) override {
     LocalTestPeerProxy* new_proxy = new LocalTestPeerProxy(peer_pb.permanent_uuid(),
                                                            pool_.get(),
                                                            peers_);
@@ -568,12 +577,17 @@ class LocalTestPeerProxyFactory : public PeerProxyFactory {
     return Status::OK();
   }
 
-  virtual const vector<LocalTestPeerProxy*>& GetProxies() {
+  const vector<LocalTestPeerProxy*>& GetProxies() {
     return proxies_;
   }
 
+  const std::shared_ptr<rpc::Messenger>& messenger() const override {
+    return messenger_;
+  }
+
  private:
   gscoped_ptr<ThreadPool> pool_;
+  std::shared_ptr<rpc::Messenger> messenger_;
   TestPeerMapManager* const peers_;
     // NOTE: There is no need to delete this on the dctor because proxies are externally managed
   vector<LocalTestPeerProxy*> proxies_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 1307b5b..2ff9f41 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -28,6 +28,7 @@
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/test_macros.h"
@@ -41,6 +42,8 @@ namespace consensus {
 
 using log::Log;
 using log::LogOptions;
+using rpc::Messenger;
+using rpc::MessengerBuilder;
 using std::shared_ptr;
 using std::unique_ptr;
 
@@ -81,10 +84,14 @@ class ConsensusPeersTest : public KuduTest {
         FakeRaftPeerPB(kLeaderUuid),
         kTabletId,
         raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+
+    MessengerBuilder bld("test");
+    ASSERT_OK(bld.Build(&messenger_));
   }
 
   virtual void TearDown() OVERRIDE {
     ASSERT_OK(log_->WaitUntilAllFlushed());
+    messenger_->Shutdown();
   }
 
   DelayablePeerProxy<NoOpTestPeerProxy>* NewRemotePeer(
@@ -95,12 +102,13 @@ class ConsensusPeersTest : public KuduTest {
     auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
         raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
     gscoped_ptr<PeerProxy> proxy(proxy_ptr);
-    CHECK_OK(Peer::NewRemotePeer(peer_pb,
+    CHECK_OK(Peer::NewRemotePeer(std::move(peer_pb),
                                  kTabletId,
                                  kLeaderUuid,
                                  message_queue_.get(),
                                  raft_pool_token_.get(),
                                  std::move(proxy),
+                                 messenger_,
                                  peer));
     return proxy_ptr;
   }
@@ -139,6 +147,7 @@ class ConsensusPeersTest : public KuduTest {
   LogOptions options_;
   unique_ptr<ThreadPoolToken> raft_pool_token_;
   scoped_refptr<server::Clock> clock_;
+  shared_ptr<Messenger> messenger_;
 };
 
 
@@ -246,6 +255,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
                                 message_queue_.get(),
                                 raft_pool_token_.get(),
                                 gscoped_ptr<PeerProxy>(mock_proxy),
+                                messenger_,
                                 &peer));
 
   // Make the peer respond without making any progress -- it always returns
@@ -284,6 +294,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
                                 message_queue_.get(),
                                 raft_pool_token_.get(),
                                 gscoped_ptr<PeerProxy>(mock_proxy),
+                                messenger_,
                                 &peer));
 
   // Initial response has to be successful -- otherwise we'll consider the peer

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index d1c44b6..751621c 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -18,14 +18,15 @@
 #include "kudu/consensus/consensus.proxy.h"
 
 #include <algorithm>
-#include <boost/bind.hpp>
-#include <gflags/gflags.h>
-#include <glog/logging.h>
+#include <functional>
 #include <mutex>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/consensus_queue.h"
@@ -33,12 +34,15 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/messenger.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/threadpool.h"
 
 DEFINE_int32(consensus_rpc_timeout_ms, 1000,
@@ -80,45 +84,54 @@ using rpc::RpcController;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;
 
-Status Peer::NewRemotePeer(const RaftPeerPB& peer_pb,
-                           const string& tablet_id,
-                           const string& leader_uuid,
+Status Peer::NewRemotePeer(RaftPeerPB peer_pb,
+                           string tablet_id,
+                           string leader_uuid,
                            PeerMessageQueue* queue,
                            ThreadPoolToken* raft_pool_token,
                            gscoped_ptr<PeerProxy> proxy,
+                           shared_ptr<Messenger> messenger,
                            shared_ptr<Peer>* peer) {
 
-  shared_ptr<Peer> new_peer(new Peer(peer_pb,
-                                     tablet_id,
-                                     leader_uuid,
-                                     std::move(proxy),
+  shared_ptr<Peer> new_peer(new Peer(std::move(peer_pb),
+                                     std::move(tablet_id),
+                                     std::move(leader_uuid),
                                      queue,
-                                     raft_pool_token));
+                                     raft_pool_token,
+                                     std::move(proxy),
+                                     std::move(messenger)));
   RETURN_NOT_OK(new_peer->Init());
   *peer = std::move(new_peer);
   return Status::OK();
 }
 
-Peer::Peer(const RaftPeerPB& peer_pb, string tablet_id, string leader_uuid,
-           gscoped_ptr<PeerProxy> proxy, PeerMessageQueue* queue,
-           ThreadPoolToken* raft_pool_token)
+Peer::Peer(RaftPeerPB peer_pb,
+           string tablet_id,
+           string leader_uuid,
+           PeerMessageQueue* queue,
+           ThreadPoolToken* raft_pool_token,
+           gscoped_ptr<PeerProxy> proxy,
+           shared_ptr<Messenger> messenger)
     : tablet_id_(std::move(tablet_id)),
       leader_uuid_(std::move(leader_uuid)),
-      peer_pb_(peer_pb),
+      peer_pb_(std::move(peer_pb)),
       proxy_(std::move(proxy)),
       queue_(queue),
       failed_attempts_(0),
-      heartbeater_(
-          peer_pb.permanent_uuid(),
-          MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms),
-          boost::bind(&Peer::SignalRequest, this, true)),
-      raft_pool_token_(raft_pool_token) {
+      messenger_(std::move(messenger)),
+      raft_pool_token_(raft_pool_token),
+      rng_(GetRandomSeed32()) {
 }
 
 Status Peer::Init() {
-  std::lock_guard<simple_spinlock> lock(peer_lock_);
-  queue_->TrackPeer(peer_pb_.permanent_uuid());
-  RETURN_NOT_OK(heartbeater_.Start());
+  {
+    std::lock_guard<simple_spinlock> l(peer_lock_);
+    queue_->TrackPeer(peer_pb_.permanent_uuid());
+    UpdateNextHeartbeatTimeUnlocked();
+  }
+
+  // Schedule the first heartbeat.
+  ScheduleNextHeartbeatAndMaybeSignalRequest(Status::OK());
   return Status::OK();
 }
 
@@ -217,10 +230,9 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
     return;
   }
 
-  // If we're actually sending ops there's no need to heartbeat for a while,
-  // reset the heartbeater
   if (req_has_ops) {
-    heartbeater_.Reset();
+    // If we're actually sending ops there's no need to heartbeat for a while.
+    UpdateNextHeartbeatTimeUnlocked();
   }
 
   MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction);
@@ -379,8 +391,6 @@ string Peer::LogPrefixUnlocked() const {
 }
 
 void Peer::Close() {
-  WARN_NOT_OK(heartbeater_.Stop(), "Could not stop heartbeater");
-
   // If the peer is already closed return.
   {
     std::lock_guard<simple_spinlock> lock(peer_lock_);
@@ -398,6 +408,69 @@ Peer::~Peer() {
   request_.mutable_ops()->ExtractSubrange(0, request_.ops_size(), nullptr);
 }
 
+void Peer::ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status) {
+  if (!status.ok()) {
+    // The reactor was shut down.
+    return;
+  }
+
+  // We must schedule the next callback with the lowest possible jittered
+  // time as the delay so that if SendNextRequest() resets the next heartbeat
+  // time with a very low value, the scheduled callback can still honor it.
+  //
+  // In effect, this means the callback period and the heartbeat period are
+  // decoupled from one another.
+  MonoDelta schedule_delay = GetHeartbeatJitterLowerBound();
+  bool signal_now = false;
+  {
+    std::lock_guard<simple_spinlock> l(peer_lock_);
+    if (closed_) {
+      // The peer was closed.
+      return;
+    }
+
+    MonoTime now = MonoTime::Now();
+    if (now < next_hb_time_) {
+      // It's not yet time to heartbeat. Reduce the scheduled delay if enough
+      // time has elapsed, but don't increase it.
+      schedule_delay = std::min(schedule_delay, next_hb_time_ - now);
+    } else {
+      // It's time to heartbeat. Although the next heartbeat time is set now,
+      // it may be set again when we get to SendNextRequest().
+      signal_now = true;
+      UpdateNextHeartbeatTimeUnlocked();
+    }
+  }
+
+  if (signal_now) {
+    SignalRequest(true);
+  }
+
+  // Capture a shared_ptr reference into the submitted functor so that we're
+  // guaranteed that this object outlives the functor.
+  //
+  // Note: we use std::bind and not boost::bind here because the latter doesn't
+  // work with std::shared_ptr.
+  shared_ptr<Peer> s_this = shared_from_this();
+  messenger_->ScheduleOnReactor(
+      std::bind(&Peer::ScheduleNextHeartbeatAndMaybeSignalRequest, s_this,
+                std::placeholders::_1), schedule_delay);
+}
+
+MonoDelta Peer::GetHeartbeatJitterLowerBound() {
+  return MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms / 2);
+}
+
+void Peer::UpdateNextHeartbeatTimeUnlocked() {
+  DCHECK(peer_lock_.is_locked());
+
+  // We randomize the next heartbeat time between period/2 and period so
+  // multiple tablets on the same TS don't end up heartbeating in lockstep.
+  int64_t half_period_ms = GetHeartbeatJitterLowerBound().ToMilliseconds();
+  int64_t additional_jitter_ms = rng_.NextDoubleFraction() * half_period_ms;
+  next_hb_time_ =  MonoTime::Now() + MonoDelta::FromMilliseconds(
+      half_period_ms + additional_jitter_ms);
+}
 
 RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
                            gscoped_ptr<ConsensusServiceProxy> consensus_proxy)

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 016bad4..3dfc072 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -29,7 +29,7 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
-#include "kudu/util/resettable_heartbeater.h"
+#include "kudu/util/random.h"
 #include "kudu/util/semaphore.h"
 #include "kudu/util/status.h"
 
@@ -105,18 +105,26 @@ class Peer : public std::enable_shared_from_this<Peer> {
   // log entries) are assembled on 'raft_pool_token'.
   // Response handling may also involve IO related to log-entry lookups and is
   // also done on 'raft_pool_token'.
-  static Status NewRemotePeer(const RaftPeerPB& peer_pb,
-                              const std::string& tablet_id,
-                              const std::string& leader_uuid,
+  static Status NewRemotePeer(RaftPeerPB peer_pb,
+                              std::string tablet_id,
+                              std::string leader_uuid,
                               PeerMessageQueue* queue,
                               ThreadPoolToken* raft_pool_token,
                               gscoped_ptr<PeerProxy> proxy,
+                              std::shared_ptr<rpc::Messenger> messenger,
                               std::shared_ptr<Peer>* peer);
 
  private:
-  Peer(const RaftPeerPB& peer_pb, std::string tablet_id, std::string leader_uuid,
-       gscoped_ptr<PeerProxy> proxy, PeerMessageQueue* queue,
-       ThreadPoolToken* raft_pool_token);
+  // Returns the lowest possible jitter-induced heartbeat time delay.
+  static MonoDelta GetHeartbeatJitterLowerBound();
+
+  Peer(RaftPeerPB peer_pb,
+       std::string tablet_id,
+       std::string leader_uuid,
+       PeerMessageQueue* queue,
+       ThreadPoolToken* raft_pool_token,
+       gscoped_ptr<PeerProxy> proxy,
+       std::shared_ptr<rpc::Messenger> messenger);
 
   void SendNextRequest(bool even_if_queue_empty);
 
@@ -145,6 +153,17 @@ class Peer : public std::enable_shared_from_this<Peer> {
 
   std::string LogPrefixUnlocked() const;
 
+  // Schedules the next heartbeat for this peer, optionally sending a heartbeat
+  // now if it makes sense to do so. Initially called from Init() to schedule
+  // the first heartbeat, and subsequently as a callback running on a reactor thread.
+  void ScheduleNextHeartbeatAndMaybeSignalRequest(const Status& status);
+
+  // Resets the next time that we should heartbeat to this peer. Does not
+  // perform any actual scheduling.
+  //
+  // Must be called with 'peer_lock_' held.
+  void UpdateNextHeartbeatTimeUnlocked();
+
   const std::string& tablet_id() const { return tablet_id_; }
 
   const std::string tablet_id_;
@@ -173,17 +192,19 @@ class Peer : public std::enable_shared_from_this<Peer> {
 
   rpc::RpcController controller_;
 
-  // Heartbeater for remote peer implementations.
-  // This will send status only requests to the remote peers
-  // whenever we go more than 'FLAGS_raft_heartbeat_interval_ms'
-  // without sending actual data.
-  ResettableHeartbeater heartbeater_;
+  std::shared_ptr<rpc::Messenger> messenger_;
 
   // Thread pool token used to construct requests to this peer.
   //
   // RaftConsensus owns this shared token and is responsible for destroying it.
   ThreadPoolToken* raft_pool_token_;
 
+  // The next time that a heartbeat should be sent to this peer.
+  MonoTime next_hb_time_;
+
+  // A PRNG, used to generate jittered heartbeat time.
+  Random rng_;
+
   // lock that protects Peer state changes, initialization, etc.
   mutable simple_spinlock peer_lock_;
   bool request_pending_ = false;
@@ -229,6 +250,8 @@ class PeerProxyFactory {
                           gscoped_ptr<PeerProxy>* proxy) = 0;
 
   virtual ~PeerProxyFactory() {}
+
+  virtual const std::shared_ptr<rpc::Messenger>& messenger() const = 0;
 };
 
 // PeerProxy implementation that does RPC calls
@@ -264,10 +287,15 @@ class RpcPeerProxyFactory : public PeerProxyFactory {
  public:
   explicit RpcPeerProxyFactory(std::shared_ptr<rpc::Messenger> messenger);
 
-  virtual Status NewProxy(const RaftPeerPB& peer_pb,
-                          gscoped_ptr<PeerProxy>* proxy) OVERRIDE;
+  Status NewProxy(const RaftPeerPB& peer_pb,
+                  gscoped_ptr<PeerProxy>* proxy) override;
+
+  ~RpcPeerProxyFactory();
+
+  const std::shared_ptr<rpc::Messenger>& messenger() const override {
+    return messenger_;
+  }
 
-  virtual ~RpcPeerProxyFactory();
  private:
   std::shared_ptr<rpc::Messenger> messenger_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/leader_election-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index e523af0..915344a 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/consensus/leader_election.h"
 
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -30,8 +31,14 @@
 #include "kudu/util/test_util.h"
 
 namespace kudu {
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
 namespace consensus {
 
+using std::shared_ptr;
 using std::string;
 using std::unordered_map;
 using std::vector;
@@ -73,10 +80,16 @@ class FromMapPeerProxyFactory : public PeerProxyFactory {
     return Status::OK();
   }
 
+  const shared_ptr<rpc::Messenger>& messenger() const override {
+    return null_messenger_;
+  }
+
  private:
   // FYI, the tests may add and remove nodes from this map while we hold a
   // reference to it.
   const ProxyMap* const proxy_map_;
+
+  shared_ptr<rpc::Messenger> null_messenger_;
 };
 
 class LeaderElectionTest : public KuduTest {

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/peer_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 37aebcd..322d59c 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -77,6 +77,7 @@ Status PeerManager::UpdateRaftConfig(const RaftConfigPB& config) {
                                       queue_,
                                       raft_pool_token_,
                                       std::move(peer_proxy),
+                                      peer_proxy_factory_->messenger(),
                                       &remote_peer));
     peers_.emplace(peer_pb.permanent_uuid(), std::move(remote_peer));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 3cf1d2b..f87de28 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -35,7 +35,6 @@
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/consensus/log_reader.h"
-#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/server/logical_clock.h"
 #include "kudu/util/auto_release_pool.h"
@@ -833,17 +832,20 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
   int repl0_init_count = follower0->update_calls_for_tests();
   int repl1_init_count = follower1->update_calls_for_tests();
 
-  // Now wait for about 4 times the hearbeat period the counters
-  // should have increased 3/4 times.
+  // Now wait for about 4 times the heartbeat period; the counters
+  // should have increased between 3 to 8 times.
+  //
+  // Why the variance? Heartbeat timing is jittered such that the period
+  // between heartbeats can be anywhere from half the interval to the full interval.
   SleepFor(MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms * 4LL));
 
   int repl0_final_count = follower0->update_calls_for_tests();
   int repl1_final_count = follower1->update_calls_for_tests();
 
   ASSERT_GE(repl0_final_count - repl0_init_count, 3);
-  ASSERT_LE(repl0_final_count - repl0_init_count, 4);
+  ASSERT_LE(repl0_final_count - repl0_init_count, 8);
   ASSERT_GE(repl1_final_count - repl1_init_count, 3);
-  ASSERT_LE(repl1_final_count - repl1_init_count, 4);
+  ASSERT_LE(repl1_final_count - repl1_init_count, 8);
 
   VerifyLogs(2, 0, 1);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/1070e764/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index f216074..c45ec36 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -33,8 +33,8 @@
 #include "kudu/gutil/strings/fastmem.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
-#include "kudu/tserver/tablet_copy_source_session.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tablet_copy_source_session.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"