You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/08/01 22:11:40 UTC

[3/9] kudu git commit: KUDU-1358 (part 2): heartbeat to every master

KUDU-1358 (part 2): heartbeat to every master

Now that followers accept heartbeats, let's modify the tserver to send one
to every master. Spawning a heartbeater thread for each master seemed like
the natural way to do this; it should simplify dynamic master changes in the
future (i.e. just add or remove threads as needed).

The "dirty tablet" state is now encapsulated in the heartbeater threads
themselves, and the heartbeater must "fan out" to manipulate all of it. It's
a little noisy but I think it's reasonable. The alternative is for this
state to remain in the TSTabletManager, for the heartbeater to continue
tracking which master is the leader, and for it to only send tablet reports
to that master. This can be done with a few changes (e.g. adding term
numbers to the heartbeat response), but the only benefit is reduced network
traffic when tablets are dirty, so that didn't seem worth the complexity.

There's no new test here, but this code path is exercised in the test I
reenabled, and in the new stress test (follow-on patch).

Change-Id: Ic85ac4193462d21c989dbd7874b451e8eaab8e3e
Reviewed-on: http://gerrit.cloudera.org:8080/3610
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/284591a8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/284591a8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/284591a8

Branch: refs/heads/master
Commit: 284591a866cc893ee596784298eaac296f0e1eac
Parents: 8618ae2
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Jun 13 16:57:39 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Jul 31 23:45:56 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/master_failover-itest.cc  |  10 +-
 .../ts_tablet_manager-itest.cc                  |  13 +-
 src/kudu/tserver/heartbeater.cc                 | 320 ++++++++++++-------
 src/kudu/tserver/heartbeater.h                  |  38 ++-
 src/kudu/tserver/ts_tablet_manager-test.cc      |  45 ++-
 src/kudu/tserver/ts_tablet_manager.cc           |  89 ++----
 src/kudu/tserver/ts_tablet_manager.h            |  60 +---
 7 files changed, 311 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/integration-tests/master_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_failover-itest.cc b/src/kudu/integration-tests/master_failover-itest.cc
index 1b9f5df..95866be 100644
--- a/src/kudu/integration-tests/master_failover-itest.cc
+++ b/src/kudu/integration-tests/master_failover-itest.cc
@@ -148,12 +148,7 @@ class MasterFailoverTest : public KuduTest {
 // Test that synchronous CreateTable (issue CreateTable call and then
 // wait until the table has been created) works even when the original
 // leader master has been paused.
-//
-// Temporarily disabled since multi-master isn't supported yet.
-// This test fails as of KUDU-1138, since the tablet servers haven't
-// registered with the follower master, and thus it's likely to deny
-// the CreateTable request thinking there are no TS available.
-TEST_F(MasterFailoverTest, DISABLED_TestCreateTableSync) {
+TEST_F(MasterFailoverTest, TestCreateTableSync) {
   const char* kTableName = "testCreateTableSync";
 
   if (!AllowSlowTests()) {
@@ -184,9 +179,6 @@ TEST_F(MasterFailoverTest, DISABLED_TestCreateTableSync) {
 // Test that we can issue a CreateTable call, pause the leader master
 // immediately after, then verify that the table has been created on
 // the newly elected leader master.
-//
-// TODO enable this test once flakiness issues are worked out and
-// eliminated on test machines.
 TEST_F(MasterFailoverTest, TestPauseAfterCreateTableIssued) {
   const char* kTableName = "testPauseAfterCreateTableIssued";
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/integration-tests/ts_tablet_manager-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_tablet_manager-itest.cc b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
index 2a2547e..ad78aaa 100644
--- a/src/kudu/integration-tests/ts_tablet_manager-itest.cc
+++ b/src/kudu/integration-tests/ts_tablet_manager-itest.cc
@@ -34,6 +34,7 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/tablet_peer.h"
+#include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tserver_admin.proxy.h"
@@ -166,16 +167,18 @@ TEST_F(TsTabletManagerITest, TestReportNewLeaderOnLeaderChange) {
     for (int replica = 0; replica < kNumReplicas; replica++) {
       // The MarkDirty() callback is on an async thread so it might take the
       // follower a few milliseconds to execute it. Wait for that to happen.
-      TSTabletManager* tablet_manager =
-          cluster_->mini_tablet_server(replica)->server()->tablet_manager();
+      Heartbeater* heartbeater =
+          cluster_->mini_tablet_server(replica)->server()->heartbeater();
+      vector<TabletReportPB> reports;
       for (int retry = 0; retry <= 12; retry++) {
-        if (tablet_manager->GetNumDirtyTabletsForTests() > 0) break;
+        reports = heartbeater->GenerateIncrementalTabletReportsForTests();
+        ASSERT_EQ(1, reports.size());
+        if (!reports[0].updated_tablets().empty()) break;
         SleepFor(MonoDelta::FromMilliseconds(1 << retry));
       }
 
       // Ensure that our tablet reports are consistent.
-      TabletReportPB report;
-      tablet_manager->GenerateIncrementalTabletReport(&report);
+      TabletReportPB& report = reports[0];
       ASSERT_EQ(1, report.updated_tablets_size()) << "Wrong report size:\n" << report.DebugString();
       ReportedTabletPB reported_tablet = report.updated_tablets(0);
       ASSERT_TRUE(reported_tablet.has_committed_consensus_state());

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/tserver/heartbeater.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index f36d59f..05c5a83 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -20,6 +20,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <memory>
+#include <string>
 #include <vector>
 
 #include "kudu/common/wire_protocol.h"
@@ -60,6 +61,7 @@ using kudu::master::GetLeaderMasterRpc;
 using kudu::master::ListMastersResponsePB;
 using kudu::master::Master;
 using kudu::master::MasterServiceProxy;
+using kudu::master::TabletReportPB;
 using kudu::rpc::RpcController;
 using std::shared_ptr;
 using strings::Substitute;
@@ -92,16 +94,22 @@ Status MasterServiceProxyForHostPort(const HostPort& hostport,
 // This is basically the "PIMPL" pattern.
 class Heartbeater::Thread {
  public:
-  Thread(const TabletServerOptions& opts, TabletServer* server);
+  Thread(const HostPort& master_address, TabletServer* server);
 
   Status Start();
   Status Stop();
   void TriggerASAP();
+  void MarkTabletDirty(const string& tablet_id, const string& reason);
+  void GenerateIncrementalTabletReport(TabletReportPB* report);
+  void GenerateFullTabletReport(TabletReportPB* report);
+
+  // Mark that the master successfully received and processed the given
+  // tablet report. This uses the report sequence number to "un-dirty" any
+  // tablets which have not changed since the acknowledged report.
+  void MarkTabletReportAcknowledged(const TabletReportPB& report);
 
  private:
   void RunThread();
-  Status FindLeaderMaster(const MonoTime& deadline,
-                          HostPort* leader_hostport);
   Status ConnectToMaster();
   int GetMinimumHeartbeatMillis() const;
   int GetMillisUntilNextHeartbeat() const;
@@ -110,16 +118,12 @@ class Heartbeater::Thread {
   void SetupCommonField(master::TSToMasterCommonPB* common);
   bool IsCurrentThread() const;
 
-  // The hosts/ports of masters that we may heartbeat to.
+  // The host and port of the master that this thread will heartbeat to.
   //
   // We keep the HostPort around rather than a Sockaddr because the
-  // masters may change IP addresses, and we'd like to re-resolve on
+  // master may change IP address, and we'd like to re-resolve on
   // every new attempt at connecting.
-  vector<HostPort> master_addrs_;
-
-  // Index of the master we last succesfully obtained the master
-  // consensus configuration information from.
-  int last_locate_master_idx_;
+  HostPort master_address_;
 
   // The server for which we are heartbeating.
   TabletServer* const server_;
@@ -127,9 +131,6 @@ class Heartbeater::Thread {
   // The actual running thread (NULL before it is started)
   scoped_refptr<kudu::Thread> thread_;
 
-  // Host and port of the most recent leader master.
-  HostPort leader_master_hostport_;
-
   // Current RPC proxy to the leader master.
   gscoped_ptr<master::MasterServiceProxy> proxy_;
 
@@ -140,6 +141,28 @@ class Heartbeater::Thread {
   // This is tracked so as to back-off heartbeating.
   int consecutive_failed_heartbeats_;
 
+  // Each tablet report is assigned a sequence number, so that subsequent
+  // tablet reports only need to re-report those tablets which have
+  // changed since the last report. Each tablet tracks the sequence
+  // number at which it became dirty.
+  struct TabletReportState {
+    int32_t change_seq;
+  };
+  typedef std::unordered_map<std::string, TabletReportState> DirtyMap;
+
+  // Tablets to include in the next incremental tablet report.
+  // When a tablet is added/removed/added locally and needs to be
+  // reported to the master, an entry is added to this map.
+  DirtyMap dirty_tablets_;
+
+  // Lock protecting 'dirty_tablets_'.
+  //
+  // Should not be held at the same time as mutex_.
+  mutable simple_spinlock dirty_tablets_lock_;
+
+  // Next tablet report seqno.
+  std::atomic_int next_report_seq_;
+
   // Mutex/condition pair to trigger the heartbeater thread
   // to either heartbeat early or exit.
   Mutex mutex_;
@@ -156,95 +179,105 @@ class Heartbeater::Thread {
 // Heartbeater
 ////////////////////////////////////////////////////////////
 
-Heartbeater::Heartbeater(const TabletServerOptions& opts, TabletServer* server)
-  : thread_(new Thread(opts, server)) {
+Heartbeater::Heartbeater(const TabletServerOptions& opts, TabletServer* server) {
+  DCHECK_GT(opts.master_addresses.size(), 0);
+
+  for (const auto& addr : opts.master_addresses) {
+    threads_.emplace_back(new Thread(addr, server));
+  }
 }
 Heartbeater::~Heartbeater() {
   WARN_NOT_OK(Stop(), "Unable to stop heartbeater thread");
 }
 
-Status Heartbeater::Start() { return thread_->Start(); }
-Status Heartbeater::Stop() { return thread_->Stop(); }
-void Heartbeater::TriggerASAP() { thread_->TriggerASAP(); }
+Status Heartbeater::Start() {
+  for (int i = 0; i < threads_.size(); i++) {
+    Status first_failure = threads_[i]->Start();
+    if (!first_failure.ok()) {
+      // On error, stop whichever threads were started.
+      for (int j = 0; j < i; j++) {
+        // Ignore failures; we should try to stop every thread, and
+        // 'first_failure' is the most interesting status anyway.
+        threads_[j]->Stop();
+      }
+      return first_failure;
+    }
+  }
 
-////////////////////////////////////////////////////////////
-// Heartbeater::Thread
-////////////////////////////////////////////////////////////
+  return Status::OK();
+}
+Status Heartbeater::Stop() {
+  // Stop all threads and return the first failure (if there was one).
+  Status first_failure;
+  for (const auto& thread : threads_) {
+    Status s = thread->Stop();
+    if (!s.ok() && first_failure.ok()) {
+      first_failure = s;
+    }
+  }
+  return first_failure;
+}
 
-Heartbeater::Thread::Thread(const TabletServerOptions& opts, TabletServer* server)
-  : master_addrs_(opts.master_addresses),
-    last_locate_master_idx_(0),
-    server_(server),
-    consecutive_failed_heartbeats_(0),
-    cond_(&mutex_),
-    should_run_(false),
-    heartbeat_asap_(true) {
-  CHECK(!master_addrs_.empty());
+void Heartbeater::TriggerASAP() {
+  for (const auto& thread : threads_) {
+    thread->TriggerASAP();
+  }
 }
 
-namespace {
-void LeaderMasterCallback(HostPort* dst_hostport,
-                          Synchronizer* sync,
-                          const Status& status,
-                          const HostPort& result) {
-  if (status.ok()) {
-    *dst_hostport = result;
+void Heartbeater::MarkTabletDirty(const string& tablet_id, const string& reason) {
+  for (const auto& thread : threads_) {
+    thread->MarkTabletDirty(tablet_id, reason);
   }
-  sync->StatusCB(status);
 }
-} // anonymous namespace
 
-Status Heartbeater::Thread::FindLeaderMaster(const MonoTime& deadline,
-                                             HostPort* leader_hostport) {
-  Status s = Status::OK();
-  if (master_addrs_.size() == 1) {
-    // "Shortcut" the process when a single master is specified.
-    *leader_hostport = master_addrs_[0];
-    return Status::OK();
+vector<TabletReportPB> Heartbeater::GenerateIncrementalTabletReportsForTests() {
+  vector<TabletReportPB> results;
+  for (const auto& thread : threads_) {
+    TabletReportPB report;
+    thread->GenerateIncrementalTabletReport(&report);
+    results.emplace_back(std::move(report));
   }
-  vector<Sockaddr> master_sock_addrs;
-  for (const HostPort& master_addr : master_addrs_) {
-    vector<Sockaddr> addrs;
-    Status s = master_addr.ResolveAddresses(&addrs);
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to resolve address '" << master_addr.ToString()
-                   << "': " << s.ToString();
-      continue;
-    }
-    if (addrs.size() > 1) {
-      LOG(WARNING) << "Master address '" << master_addr.ToString() << "' "
-                   << "resolves to " << addrs.size() << " different addresses. Using "
-                   << addrs[0].ToString();
-    }
-    master_sock_addrs.push_back(addrs[0]);
+  return results;
+}
+
+vector<TabletReportPB> Heartbeater::GenerateFullTabletReportsForTests() {
+  vector<TabletReportPB>  results;
+  for (const auto& thread : threads_) {
+    TabletReportPB report;
+    thread->GenerateFullTabletReport(&report);
+    results.emplace_back(std::move(report));
   }
-  if (master_sock_addrs.empty()) {
-    return Status::NotFound("unable to resolve any of the master addresses!");
+  return results;
+}
+
+void Heartbeater::MarkTabletReportsAcknowledgedForTests(
+    const vector<TabletReportPB>& reports) {
+  CHECK_EQ(reports.size(), threads_.size());
+
+  for (int i = 0; i < reports.size(); i++) {
+    threads_[i]->MarkTabletReportAcknowledged(reports[i]);
   }
-  Synchronizer sync;
-  scoped_refptr<GetLeaderMasterRpc> rpc(
-      new GetLeaderMasterRpc(Bind(&LeaderMasterCallback,
-                                  leader_hostport,
-                                  &sync),
-                             std::move(master_sock_addrs),
-                             deadline,
-                             MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms),
-                             server_->messenger()));
-  rpc->SendRpc();
-  return sync.Wait();
+}
+
+////////////////////////////////////////////////////////////
+// Heartbeater::Thread
+////////////////////////////////////////////////////////////
+
+Heartbeater::Thread::Thread(const HostPort& master_address, TabletServer* server)
+  : master_address_(master_address),
+    server_(server),
+    consecutive_failed_heartbeats_(0),
+    next_report_seq_(0),
+    cond_(&mutex_),
+    should_run_(false),
+    heartbeat_asap_(true) {
 }
 
 Status Heartbeater::Thread::ConnectToMaster() {
-  vector<Sockaddr> addrs;
-  MonoTime deadline = MonoTime::Now(MonoTime::FINE);
-  deadline.AddDelta(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
-  // TODO send heartbeats without tablet reports to non-leader masters.
-  RETURN_NOT_OK(FindLeaderMaster(deadline, &leader_master_hostport_));
   gscoped_ptr<MasterServiceProxy> new_proxy;
-  MasterServiceProxyForHostPort(leader_master_hostport_,
+  MasterServiceProxyForHostPort(master_address_,
                                 server_->messenger(),
                                 &new_proxy);
-  RETURN_NOT_OK(leader_master_hostport_.ResolveAddresses(&addrs));
 
   // Ping the master to verify that it's alive.
   master::PingRequestPB req;
@@ -252,8 +285,8 @@ Status Heartbeater::Thread::ConnectToMaster() {
   RpcController rpc;
   rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
   RETURN_NOT_OK_PREPEND(new_proxy->Ping(req, &resp, &rpc),
-                        Substitute("Failed to ping master at $0", addrs[0].ToString()));
-  LOG(INFO) << "Connected to a leader master server at " << leader_master_hostport_.ToString();
+                        Substitute("Failed to ping master at $0", master_address_.ToString()));
+  LOG(INFO) << "Connected to a master server at " << master_address_.ToString();
   proxy_.reset(new_proxy.release());
   return Status::OK();
 }
@@ -326,47 +359,39 @@ Status Heartbeater::Thread::DoHeartbeat() {
   }
 
   if (last_hb_response_.needs_full_tablet_report()) {
-    LOG(INFO) << "Sending a full tablet report to master...";
-    server_->tablet_manager()->GenerateFullTabletReport(
-      req.mutable_tablet_report());
+    LOG(INFO) << Substitute("Sending a full tablet report to master $0...",
+                            master_address_.ToString());
+    GenerateFullTabletReport(req.mutable_tablet_report());
   } else {
-    VLOG(2) << "Sending an incremental tablet report to master...";
-    server_->tablet_manager()->GenerateIncrementalTabletReport(
-      req.mutable_tablet_report());
+    VLOG(2) << Substitute("Sending an incremental tablet report to master $0...",
+                          master_address_.ToString());
+    GenerateIncrementalTabletReport(req.mutable_tablet_report());
   }
   req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
 
   RpcController rpc;
-  rpc.set_timeout(MonoDelta::FromSeconds(10));
+  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
 
   VLOG(2) << "Sending heartbeat:\n" << req.DebugString();
   master::TSHeartbeatResponsePB resp;
   RETURN_NOT_OK_PREPEND(proxy_->TSHeartbeat(req, &resp, &rpc),
-                        "Failed to send heartbeat");
+                        "Failed to send heartbeat to master");
   if (resp.has_error()) {
     return StatusFromPB(resp.error().status());
   }
 
-  VLOG(2) << "Received heartbeat response:\n" << resp.DebugString();
-  if (!resp.leader_master()) {
-    // If the master is no longer a leader, reset proxy so that we can
-    // determine the master and attempt to heartbeat during in the
-    // next heartbeat interval.
-    proxy_.reset();
-    return Status::ServiceUnavailable("master is no longer the leader");
-  }
+  VLOG(2) << Substitute("Received heartbeat response from $0:\n$1",
+                        master_address_.ToString(), resp.DebugString());
   last_hb_response_.Swap(&resp);
 
-
-  // TODO: Handle TSHeartbeatResponsePB (e.g. deleted tablets and schema changes)
-  server_->tablet_manager()->MarkTabletReportAcknowledged(req.tablet_report());
-
+  MarkTabletReportAcknowledged(req.tablet_report());
   return Status::OK();
 }
 
 void Heartbeater::Thread::RunThread() {
   CHECK(IsCurrentThread());
-  VLOG(1) << "Heartbeat thread starting";
+  VLOG(1) << Substitute("Heartbeat thread (master $0) starting",
+                        master_address_.ToString());
 
   // Set up a fake "last heartbeat response" which indicates that we
   // need to register -- since we've never registered before, we know
@@ -396,24 +421,22 @@ void Heartbeater::Thread::RunThread() {
       heartbeat_asap_ = false;
 
       if (!should_run_) {
-        VLOG(1) << "Heartbeat thread finished";
+        VLOG(1) << Substitute("Heartbeat thread (master $0) finished",
+                              master_address_.ToString());
         return;
       }
     }
 
     Status s = DoHeartbeat();
     if (!s.ok()) {
-      LOG(WARNING) << "Failed to heartbeat to " << leader_master_hostport_.ToString()
-                   << ": " << s.ToString();
+      LOG(WARNING) << Substitute("Failed to heartbeat to $0: $1",
+                                 master_address_.ToString(), s.ToString());
       consecutive_failed_heartbeats_++;
-      if (master_addrs_.size() > 1) {
-        // If we encountered a network error (e.g., connection
-        // refused) and there's more than one master available, try
-        // determining the leader master again.
-        if (s.IsNetworkError() ||
-            consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
-          proxy_.reset();
-        }
+      // If we encountered a network error (e.g., connection
+      // refused), try reconnecting.
+      if (s.IsNetworkError() ||
+          consecutive_failed_heartbeats_ == FLAGS_heartbeat_max_failures_before_backoff) {
+        proxy_.reset();
       }
       continue;
     }
@@ -425,6 +448,28 @@ bool Heartbeater::Thread::IsCurrentThread() const {
   return thread_.get() == kudu::Thread::current_thread();
 }
 
+void Heartbeater::Thread::MarkTabletReportAcknowledged(const master::TabletReportPB& report) {
+  std::lock_guard<simple_spinlock> l(dirty_tablets_lock_);
+
+  int32_t acked_seq = report.sequence_number();
+  CHECK_LT(acked_seq, next_report_seq_.load());
+
+  // Clear the "dirty" state for any tablets which have not changed since
+  // this report.
+  auto it = dirty_tablets_.begin();
+  while (it != dirty_tablets_.end()) {
+    const TabletReportState& state = it->second;
+    if (state.change_seq <= acked_seq) {
+      // This entry has not changed since this tablet report, we no longer need
+      // to track it as dirty. If it becomes dirty again, it will be re-added
+      // with a higher sequence number.
+      it = dirty_tablets_.erase(it);
+    } else {
+      ++it;
+    }
+  }
+}
+
 Status Heartbeater::Thread::Start() {
   CHECK(thread_ == nullptr);
 
@@ -454,5 +499,50 @@ void Heartbeater::Thread::TriggerASAP() {
   cond_.Signal();
 }
 
+void Heartbeater::Thread::MarkTabletDirty(const string& tablet_id, const string& reason) {
+  std::lock_guard<simple_spinlock> l(dirty_tablets_lock_);
+
+  // Even though this is an atomic load, it needs to hold the lock. To see why,
+  // consider this sequence:
+  // 0. Tablet t exists in dirty_tablets_.
+  // 1. T1 calls MarkTabletDirty(t), loads x from next_report_seq_, and is
+  //    descheduled.
+  // 2. T2 generates a tablet report, incrementing next_report_seq_ to x+1.
+  // 3. T3 calls MarkTabletDirty(t), loads x+1 into next_report_seq_, and
+  //    writes x+1 to state->change_seq.
+  // 4. T1 is scheduled. It tries to write x to state->change_seq, failing the
+  //    CHECK_GE().
+  int32_t seqno = next_report_seq_.load();
+
+  TabletReportState* state = FindOrNull(dirty_tablets_, tablet_id);
+  if (state != nullptr) {
+    CHECK_GE(seqno, state->change_seq);
+    state->change_seq = seqno;
+  } else {
+    TabletReportState state = { seqno };
+    InsertOrDie(&dirty_tablets_, tablet_id, std::move(state));
+  }
+}
+
+void Heartbeater::Thread::GenerateIncrementalTabletReport(TabletReportPB* report) {
+  report->Clear();
+  report->set_sequence_number(next_report_seq_.fetch_add(1));
+  report->set_is_incremental(true);
+  vector<string> dirty_tablet_ids;
+  {
+    std::lock_guard<simple_spinlock> l(dirty_tablets_lock_);
+    AppendKeysFromMap(dirty_tablets_, &dirty_tablet_ids);
+  }
+  server_->tablet_manager()->PopulateIncrementalTabletReport(
+      report, dirty_tablet_ids);
+}
+
+void Heartbeater::Thread::GenerateFullTabletReport(TabletReportPB* report) {
+  report->Clear();
+  report->set_sequence_number(next_report_seq_.fetch_add(1));
+  report->set_is_incremental(false);
+  server_->tablet_manager()->PopulateFullTabletReport(report);
+}
+
 } // namespace tserver
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/tserver/heartbeater.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/heartbeater.h b/src/kudu/tserver/heartbeater.h
index 2529ce2..b365216 100644
--- a/src/kudu/tserver/heartbeater.h
+++ b/src/kudu/tserver/heartbeater.h
@@ -17,35 +17,59 @@
 #ifndef KUDU_TSERVER_HEARTBEATER_H
 #define KUDU_TSERVER_HEARTBEATER_H
 
-#include "kudu/gutil/gscoped_ptr.h"
+#include <memory>
+#include <string>
+#include <vector>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+namespace master {
+class TabletReportPB;
+}
+
 namespace tserver {
 
 class TabletServer;
 struct TabletServerOptions;
 
-// Component of the Tablet Server which is responsible for heartbeating to the
-// leader master.
-//
-// TODO: send heartbeats to non-leader masters.
+// Component of the Tablet Server which is responsible for heartbeating to all
+// of the masters.
 class Heartbeater {
  public:
   Heartbeater(const TabletServerOptions& options, TabletServer* server);
+
+  // Start heartbeating to every master.
   Status Start();
+
+  // Stop heartbeating to every master.
   Status Stop();
 
-  // Trigger a heartbeat as soon as possible, even if the normal
+  // Trigger heartbeats as soon as possible, even if the normal
   // heartbeat interval has not expired.
   void TriggerASAP();
 
+  // Mark the given tablet as dirty, or do nothing if it is already dirty.
+  //
+  // Tablet dirtiness is tracked separately for each master. Dirty tablets are
+  // included in the heartbeat's tablet report, and only marked not dirty once
+  // the report has been acknowledged by the master.
+  void MarkTabletDirty(const std::string& tablet_id, const std::string& reason);
+
   ~Heartbeater();
 
+  // Methods for manually manipulating tablet reports, intended for testing.
+  // The generate methods return one report per master.
+  std::vector<master::TabletReportPB> GenerateIncrementalTabletReportsForTests();
+  std::vector<master::TabletReportPB> GenerateFullTabletReportsForTests();
+  void MarkTabletReportsAcknowledgedForTests(
+      const std::vector<master::TabletReportPB>& reports);
+
  private:
   class Thread;
-  gscoped_ptr<Thread> thread_;
+  std::vector<std::unique_ptr<Thread>> threads_;
   DISALLOW_COPY_AND_ASSIGN(Heartbeater);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/tserver/ts_tablet_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager-test.cc b/src/kudu/tserver/ts_tablet_manager-test.cc
index 2fa5a71..8641e0b 100644
--- a/src/kudu/tserver/ts_tablet_manager-test.cc
+++ b/src/kudu/tserver/ts_tablet_manager-test.cc
@@ -27,6 +27,7 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/util/test_util.h"
@@ -67,6 +68,7 @@ class TsTabletManagerTest : public KuduTest {
 
     tablet_manager_ = mini_server_->server()->tablet_manager();
     fs_manager_ = mini_server_->server()->fs_manager();
+    heartbeater_ = mini_server_->server()->heartbeater();
   }
 
   Status CreateNewTablet(const std::string& tablet_id,
@@ -89,10 +91,29 @@ class TsTabletManagerTest : public KuduTest {
     return tablet_peer->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10));
   }
 
+  void GenerateFullTabletReport(TabletReportPB* report) {
+    vector<TabletReportPB> reports =
+        heartbeater_->GenerateFullTabletReportsForTests();
+    ASSERT_EQ(1, reports.size());
+    report->CopyFrom(reports[0]);
+  }
+
+  void GenerateIncrementalTabletReport(TabletReportPB* report) {
+    vector<TabletReportPB> reports =
+        heartbeater_->GenerateIncrementalTabletReportsForTests();
+    ASSERT_EQ(1, reports.size());
+    report->CopyFrom(reports[0]);
+  }
+
+  void MarkTabletReportAcknowledged(const TabletReportPB& report) {
+    heartbeater_->MarkTabletReportsAcknowledgedForTests({ report });
+  }
+
  protected:
   gscoped_ptr<MiniTabletServer> mini_server_;
   FsManager* fs_manager_;
   TSTabletManager* tablet_manager_;
+  Heartbeater* heartbeater_;
 
   Schema schema_;
   RaftConfigPB config_;
@@ -157,24 +178,24 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
   int64_t seqno = -1;
 
   // Generate a tablet report before any tablets are loaded. Should be empty.
-  tablet_manager_->GenerateFullTabletReport(&report);
+  GenerateFullTabletReport(&report);
   ASSERT_FALSE(report.is_incremental());
   ASSERT_EQ(0, report.updated_tablets().size());
   ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
-  tablet_manager_->MarkTabletReportAcknowledged(report);
+  MarkTabletReportAcknowledged(report);
 
   // Another report should now be incremental, but with no changes.
-  tablet_manager_->GenerateIncrementalTabletReport(&report);
+  GenerateIncrementalTabletReport(&report);
   ASSERT_TRUE(report.is_incremental());
   ASSERT_EQ(0, report.updated_tablets().size());
   ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
-  tablet_manager_->MarkTabletReportAcknowledged(report);
+  MarkTabletReportAcknowledged(report);
 
   // Create a tablet and do another incremental report - should include the tablet.
   ASSERT_OK(CreateNewTablet("tablet-1", schema_, nullptr));
   int updated_tablets = 0;
   while (updated_tablets != 1) {
-    tablet_manager_->GenerateIncrementalTabletReport(&report);
+    GenerateIncrementalTabletReport(&report);
     updated_tablets = report.updated_tablets().size();
     ASSERT_TRUE(report.is_incremental());
     ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
@@ -184,19 +205,19 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
 
   // If we don't acknowledge the report, and ask for another incremental report,
   // it should include the tablet again.
-  tablet_manager_->GenerateIncrementalTabletReport(&report);
+  GenerateIncrementalTabletReport(&report);
   ASSERT_TRUE(report.is_incremental());
   ASSERT_EQ(1, report.updated_tablets().size());
   ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");
   ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
 
   // Now acknowledge the last report, and further incrementals should be empty.
-  tablet_manager_->MarkTabletReportAcknowledged(report);
-  tablet_manager_->GenerateIncrementalTabletReport(&report);
+  MarkTabletReportAcknowledged(report);
+  GenerateIncrementalTabletReport(&report);
   ASSERT_TRUE(report.is_incremental());
   ASSERT_EQ(0, report.updated_tablets().size());
   ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report);
-  tablet_manager_->MarkTabletReportAcknowledged(report);
+  MarkTabletReportAcknowledged(report);
 
   // Create a second tablet, and ensure the incremental report shows it.
   ASSERT_OK(CreateNewTablet("tablet-2", schema_, nullptr));
@@ -210,7 +231,7 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
   report.Clear();
   while (true) {
     bool found_tablet_2 = false;
-    tablet_manager_->GenerateIncrementalTabletReport(&report);
+    GenerateIncrementalTabletReport(&report);
     ASSERT_TRUE(report.is_incremental()) << report.ShortDebugString();
     ASSERT_MONOTONIC_REPORT_SEQNO(&seqno, report) << report.ShortDebugString();
     for (const ReportedTabletPB& reported_tablet : report.updated_tablets()) {
@@ -227,10 +248,10 @@ TEST_F(TsTabletManagerTest, TestTabletReports) {
     SleepFor(MonoDelta::FromMilliseconds(10));
   }
 
-  tablet_manager_->MarkTabletReportAcknowledged(report);
+  MarkTabletReportAcknowledged(report);
 
   // Asking for a full tablet report should re-report both tablets
-  tablet_manager_->GenerateFullTabletReport(&report);
+  GenerateFullTabletReport(&report);
   ASSERT_FALSE(report.is_incremental());
   ASSERT_EQ(2, report.updated_tablets().size());
   ASSERT_REPORT_HAS_UPDATED_TABLET(report, "tablet-1");

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 319011a..072b028 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -151,7 +151,6 @@ TSTabletManager::TSTabletManager(FsManager* fs_manager,
                                  MetricRegistry* metric_registry)
   : fs_manager_(fs_manager),
     server_(server),
-    next_report_seq_(0),
     metric_registry_(metric_registry),
     state_(MANAGER_INITIALIZING) {
 
@@ -805,13 +804,11 @@ void TSTabletManager::GetTabletPeers(vector<scoped_refptr<TabletPeer> >* tablet_
 }
 
 void TSTabletManager::MarkTabletDirty(const std::string& tablet_id, const std::string& reason) {
-  std::lock_guard<rw_spinlock> lock(lock_);
-  MarkDirtyUnlocked(tablet_id, reason);
-}
-
-int TSTabletManager::GetNumDirtyTabletsForTests() const {
-  shared_lock<rw_spinlock> l(lock_);
-  return dirty_tablets_.size();
+  VLOG(2) << Substitute("$0 Marking dirty. Reason: $1. Will report this "
+      "tablet to the Master in the next heartbeat",
+      LogPrefix(tablet_id), reason);
+  server_->heartbeater()->MarkTabletDirty(tablet_id, reason);
+  server_->heartbeater()->TriggerASAP();
 }
 
 int TSTabletManager::GetNumLiveTablets() const {
@@ -827,22 +824,6 @@ int TSTabletManager::GetNumLiveTablets() const {
   return count;
 }
 
-void TSTabletManager::MarkDirtyUnlocked(const std::string& tablet_id, const std::string& reason) {
-  TabletReportState* state = FindOrNull(dirty_tablets_, tablet_id);
-  if (state != nullptr) {
-    CHECK_GE(next_report_seq_, state->change_seq);
-    state->change_seq = next_report_seq_;
-  } else {
-    TabletReportState state;
-    state.change_seq = next_report_seq_;
-    InsertOrDie(&dirty_tablets_, tablet_id, state);
-  }
-  VLOG(2) << LogPrefix(tablet_id) << "Marking dirty. Reason: " << reason
-          << ". Will report this tablet to the Master in the next heartbeat "
-          << "as part of report #" << next_report_seq_;
-  server_->heartbeater()->TriggerASAP();
-}
-
 void TSTabletManager::InitLocalRaftPeerPB() {
   DCHECK_EQ(state(), MANAGER_INITIALIZING);
   local_peer_pb_.set_permanent_uuid(fs_manager_->uuid());
@@ -854,7 +835,7 @@ void TSTabletManager::InitLocalRaftPeerPB() {
 
 void TSTabletManager::CreateReportedTabletPB(const string& tablet_id,
                                              const scoped_refptr<TabletPeer>& tablet_peer,
-                                             ReportedTabletPB* reported_tablet) {
+                                             ReportedTabletPB* reported_tablet) const {
   reported_tablet->set_tablet_id(tablet_id);
   reported_tablet->set_state(tablet_peer->state());
   reported_tablet->set_tablet_data_state(tablet_peer->tablet_metadata()->tablet_data_state());
@@ -872,53 +853,25 @@ void TSTabletManager::CreateReportedTabletPB(const string& tablet_id,
   }
 }
 
-void TSTabletManager::GenerateIncrementalTabletReport(TabletReportPB* report) {
-  shared_lock<rw_spinlock> l(lock_);
-  report->Clear();
-  report->set_sequence_number(next_report_seq_++);
-  report->set_is_incremental(true);
-  for (const DirtyMap::value_type& dirty_entry : dirty_tablets_) {
-    const string& tablet_id = dirty_entry.first;
-    scoped_refptr<tablet::TabletPeer>* tablet_peer = FindOrNull(tablet_map_, tablet_id);
-    if (tablet_peer) {
-      // Dirty entry, report on it.
-      CreateReportedTabletPB(tablet_id, *tablet_peer, report->add_updated_tablets());
-    } else {
-      // Removed.
-      report->add_removed_tablet_ids(tablet_id);
-    }
-  }
-}
-
-void TSTabletManager::GenerateFullTabletReport(TabletReportPB* report) {
-  shared_lock<rw_spinlock> l(lock_);
-  report->Clear();
-  report->set_is_incremental(false);
-  report->set_sequence_number(next_report_seq_++);
-  for (const TabletMap::value_type& entry : tablet_map_) {
-    CreateReportedTabletPB(entry.first, entry.second, report->add_updated_tablets());
+void TSTabletManager::PopulateFullTabletReport(TabletReportPB* report) const {
+  shared_lock<rw_spinlock> shared_lock(lock_);
+  for (const auto& e : tablet_map_) {
+    CreateReportedTabletPB(e.first, e.second, report->add_updated_tablets());
   }
-  dirty_tablets_.clear();
 }
 
-void TSTabletManager::MarkTabletReportAcknowledged(const TabletReportPB& report) {
-  std::lock_guard<rw_spinlock> l(lock_);
-
-  int32_t acked_seq = report.sequence_number();
-  CHECK_LT(acked_seq, next_report_seq_);
-
-  // Clear the "dirty" state for any tablets which have not changed since
-  // this report.
-  auto it = dirty_tablets_.begin();
-  while (it != dirty_tablets_.end()) {
-    const TabletReportState& state = it->second;
-    if (state.change_seq <= acked_seq) {
-      // This entry has not changed since this tablet report, we no longer need
-      // to track it as dirty. If it becomes dirty again, it will be re-added
-      // with a higher sequence number.
-      it = dirty_tablets_.erase(it);
+void TSTabletManager::PopulateIncrementalTabletReport(TabletReportPB* report,
+                                                      const vector<string>& tablet_ids) const {
+  shared_lock<rw_spinlock> shared_lock(lock_);
+  for (const auto& id : tablet_ids) {
+    const scoped_refptr<tablet::TabletPeer>* tablet_peer =
+        FindOrNull(tablet_map_, id);
+    if (tablet_peer) {
+      // Dirty entry, report on it.
+      CreateReportedTabletPB(id, *tablet_peer, report->add_updated_tablets());
     } else {
-      ++it;
+      // Removed.
+      report->add_removed_tablet_ids(id);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/284591a8/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index a0d362b..347ab5d 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -152,37 +152,24 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
       const consensus::StartRemoteBootstrapRequestPB& req,
       boost::optional<TabletServerErrorPB::Code>* error_code) OVERRIDE;
 
-  // Generate an incremental tablet report.
-  //
-  // This will report any tablets which have changed since the last acknowleged
-  // tablet report. Once the report is successfully transferred, call
-  // MarkTabletReportAcknowledged() to clear the incremental state. Otherwise, the
-  // next tablet report will continue to include the same tablets until one
-  // is acknowleged.
-  //
-  // This is thread-safe to call along with tablet modification, but not safe
-  // to call from multiple threads at the same time.
-  void GenerateIncrementalTabletReport(master::TabletReportPB* report);
-
-  // Generate a full tablet report and reset any incremental state tracking.
-  void GenerateFullTabletReport(master::TabletReportPB* report);
+  // Adds updated tablet information to 'report'.
+  void PopulateFullTabletReport(master::TabletReportPB* report) const;
 
-  // Mark that the master successfully received and processed the given
-  // tablet report. This uses the report sequence number to "un-dirty" any
-  // tablets which have not changed since the acknowledged report.
-  void MarkTabletReportAcknowledged(const master::TabletReportPB& report);
+  // Adds updated tablet information to 'report'. Only tablets in 'tablet_ids'
+  // are included.
+  void PopulateIncrementalTabletReport(master::TabletReportPB* report,
+                                       const std::vector<std::string>& tablet_ids) const;
 
   // Get all of the tablets currently hosted on this server.
   void GetTabletPeers(std::vector<scoped_refptr<tablet::TabletPeer> >* tablet_peers) const;
 
-  // Marks tablet with 'tablet_id' dirty.
-  // Used for state changes outside of the control of TsTabletManager, such as consensus role
-  // changes.
+  // Marks tablet with 'tablet_id' as dirty so that it'll be included in the
+  // next round of master heartbeats.
+  //
+  // Dirtying events typically include state changes outside of the control of
+  // TsTabletManager, such as consensus role changes.
   void MarkTabletDirty(const std::string& tablet_id, const std::string& reason);
 
-  // Returns the number of tablets in the "dirty" map, for use by unit tests.
-  int GetNumDirtyTabletsForTests() const;
-
   // Return the number of tablets in RUNNING or BOOTSTRAPPING state.
   int GetNumLiveTablets() const;
 
@@ -197,15 +184,6 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
     REPLACEMENT_PEER
   };
 
-  // Each tablet report is assigned a sequence number, so that subsequent
-  // tablet reports only need to re-report those tablets which have
-  // changed since the last report. Each tablet tracks the sequence
-  // number at which it became dirty.
-  struct TabletReportState {
-    uint32_t change_seq;
-  };
-  typedef std::unordered_map<std::string, TabletReportState> DirtyMap;
-
   // Standard log prefix, given a tablet id.
   std::string LogPrefix(const std::string& tablet_id) const;
 
@@ -268,13 +246,7 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
   // Helper to generate the report for a single tablet.
   void CreateReportedTabletPB(const std::string& tablet_id,
                               const scoped_refptr<tablet::TabletPeer>& tablet_peer,
-                              master::ReportedTabletPB* reported_tablet);
-
-  // Mark that the provided TabletPeer's state has changed. That should be taken into
-  // account in the next report.
-  //
-  // NOTE: requires that the caller holds the lock.
-  void MarkDirtyUnlocked(const std::string& tablet_id, const std::string& reason);
+                              master::ReportedTabletPB* reported_tablet) const;
 
   // Handle the case on startup where we find a tablet that is not in
   // TABLET_DATA_READY state. Generally, we tombstone the replica.
@@ -334,14 +306,6 @@ class TSTabletManager : public tserver::TabletPeerLookupIf {
   // bootstrap, creation, or deletion is in-progress
   TransitionInProgressMap transition_in_progress_;
 
-  // Tablets to include in the next incremental tablet report.
-  // When a tablet is added/removed/added locally and needs to be
-  // reported to the master, an entry is added to this map.
-  DirtyMap dirty_tablets_;
-
-  // Next tablet report seqno.
-  int32_t next_report_seq_;
-
   MetricRegistry* metric_registry_;
 
   TSTabletManagerStatePB state_;