You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/09 00:07:27 UTC

kudu git commit: KUDU-1125: issue one catalog write per tablet report

Repository: kudu
Updated Branches:
  refs/heads/master 761ce10b7 -> 29a7568dd


KUDU-1125: issue one catalog write per tablet report

This commit addresses a long-standing issue in the catalog manager where an
independent catalog write is performed for each reported tablet. When the
master is configured to fsync WAL writes, this can add a lot of load during
election storms and when the master is restarted.

To tackle this, I fully rewrote ProcessTabletReport and friends. I had
higher hopes for the final product, but all of the dependent control flow
complicates decomposition. Still, I managed to make some improvements. For
example, all RPCs are now sent at the end in one go rather than piecemeal. I
also rewrote all of the comments so that they can serve as a map to the
function and to emphasize the actions performed by the corresponding code.

Here are the actual semantic changes being made:
- Table and tablet locks are now acquired en masse. For tablets, this is
  required for correctness; I've documented why I did the same for tables.
- We no longer check for non-running tables. AFAICT this was a useless check
  because a non-running table must be a deleted table, and there's a check
  for that just before.
- We no longer wake the BgTasks thread upon completion. This is because:
  1. WakeIfHasPendingUpdates() was semi-broken; pending_updates_ is never
     set in ProcessTabletReport.
  2. Regardless, there's no work for the BgTasks thread to do.

Change-Id: Ie6f5cf0e4b1cd1160b3b310d89c6dbf3dd62e43b
Reviewed-on: http://gerrit.cloudera.org:8080/8090
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/29a7568d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/29a7568d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/29a7568d

Branch: refs/heads/master
Commit: 29a7568ddb5d24b1faf583b45d38884bfeb2b260
Parents: 761ce10
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Sep 12 19:45:50 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Oct 9 00:07:08 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/delete_table-itest.cc     |  32 +-
 src/kudu/master/catalog_manager.cc              | 757 ++++++++++---------
 src/kudu/master/catalog_manager.h               |  46 +-
 3 files changed, 391 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/29a7568d/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 23f9be6..41a96d1 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -1100,12 +1100,6 @@ TEST_F(DeleteTableITest, TestUnknownTabletsAreNotDeleted) {
       .num_replicas(1)
       .Create());
 
-  // Figure out the tablet id of the created tablet.
-  const MonoDelta timeout = MonoDelta::FromSeconds(30);
-  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
-  ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
-  const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
   // Delete the master's metadata and start it back up. The tablet created
   // above is now unknown, but should not be deleted!
   cluster_->master()->Shutdown();
@@ -1116,6 +1110,8 @@ TEST_F(DeleteTableITest, TestUnknownTabletsAreNotDeleted) {
   // so that it can be found after the subsequent restart below.
   ASSERT_OK(cluster_->master()->WaitForCatalogManager());
 
+  // The master should not delete the tablet. Let's wait for at least one
+  // heartbeat to pass.
   int64_t num_delete_attempts;
   ASSERT_EVENTUALLY([&]() {
     int64_t num_heartbeats;
@@ -1130,30 +1126,6 @@ TEST_F(DeleteTableITest, TestUnknownTabletsAreNotDeleted) {
       &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
       "total_count", &num_delete_attempts));
   ASSERT_EQ(0, num_delete_attempts);
-
-  // Now restart the master with orphan deletion enabled. The tablet should get
-  // deleted.
-  // We also need to restart the tablet server, or else the old tablet server
-  // won't be able to authenticate to the new master, due to it having a new
-  // CA certificate which the old tserver doesn't trust.
-  //
-  // TODO(KUDU-65): perhaps this is actually a feature? should we have tablet servers
-  // remember the CA cert persistently so that it's impossible to connect an
-  // old tserver to a new cluster?
-  cluster_->Shutdown();
-  cluster_->master()->mutable_flags()->push_back(
-      "--catalog_manager_delete_orphaned_tablets");
-  ASSERT_OK(cluster_->Restart());
-  ASSERT_EVENTUALLY([&]() {
-    ASSERT_OK(cluster_->tablet_server(0)->GetInt64Metric(
-        &METRIC_ENTITY_server, "kudu.tabletserver",
-        &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
-        "total_count", &num_delete_attempts));
-    // Sometimes the tablet server has time to report the orphaned tablet multiple times
-    // before the delete succeeds. This is ok because tablet deletion is idempotent.
-    ASSERT_GE(num_delete_attempts, 1);
-    ASSERT_OK(CheckTabletDeletedOnTS(0, tablet_id, SUPERBLOCK_NOT_EXPECTED));
-  });
 }
 
 // Ensure that the master doesn't try to delete tombstoned tablets.

http://git-wip-us.apache.org/repos/asf/kudu/blob/29a7568d/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index e073a97..2cfdcc9 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -46,13 +46,13 @@
 #include <cstdlib>
 #include <functional>
 #include <iterator>
-#include <map>
 #include <memory>
 #include <mutex>
 #include <ostream>
 #include <set>
 #include <string>
 #include <type_traits>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -107,6 +107,7 @@
 #include "kudu/security/token_signer.h"
 #include "kudu/security/token_signing_key.h"
 #include "kudu/server/monitored_task.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/transactions/transaction_tracker.h"
 #include "kudu/tserver/tserver_admin.pb.h"
@@ -217,14 +218,6 @@ DEFINE_bool(catalog_manager_fail_ts_rpcs, false,
 TAG_FLAG(catalog_manager_fail_ts_rpcs, hidden);
 TAG_FLAG(catalog_manager_fail_ts_rpcs, runtime);
 
-DEFINE_bool(catalog_manager_delete_orphaned_tablets, false,
-            "Whether the master should delete tablets reported by tablet "
-            "servers for which there are no corresponding records in the "
-            "master's metadata. Use this option with care; it may cause "
-            "permanent tablet data loss under specific (and rare) cases of "
-            "master failures!");
-TAG_FLAG(catalog_manager_delete_orphaned_tablets, advanced);
-
 DEFINE_int32(catalog_manager_inject_latency_prior_tsk_write_ms, 0,
              "Injects a random sleep between 0 and this many milliseconds "
              "prior to writing newly generated TSK into the system table. "
@@ -232,12 +225,12 @@ DEFINE_int32(catalog_manager_inject_latency_prior_tsk_write_ms, 0,
 TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, hidden);
 TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, unsafe);
 
-using std::map;
 using std::pair;
 using std::set;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::unordered_set;
 using std::vector;
 
@@ -447,13 +440,6 @@ class CatalogManagerBgTasks {
     pending_updates_ = false;
   }
 
-  void WakeIfHasPendingUpdates() {
-    MutexLock lock(lock_);
-    if (pending_updates_) {
-      cond_.Broadcast();
-    }
-  }
-
  private:
   void Run();
 
@@ -2273,41 +2259,18 @@ void CatalogManager::NotifyTabletDeleteSuccess(const string& permanent_uuid,
   // tablet.
 }
 
-Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
-                                           const TabletReportPB& report,
-                                           TabletReportUpdatesPB *report_update,
-                                           RpcContext* rpc) {
-  TRACE_EVENT2("master", "ProcessTabletReport",
-               "requestor", rpc->requestor_string(),
-               "num_tablets", report.updated_tablets_size());
-
-  leader_lock_.AssertAcquiredForReading();
-
-  VLOG(2) << Substitute("Received tablet report from $0:\n$1",
-                        RequestorString(rpc), SecureDebugString(report));
-
-  // TODO: on a full tablet report, we may want to iterate over the tablets we think
-  // the server should have, compare vs the ones being reported, and somehow mark
-  // any that have been "lost" (eg somehow the tablet metadata got corrupted or something).
-
-  for (const ReportedTabletPB& reported : report.updated_tablets()) {
-    ReportedTabletUpdatesPB *tablet_report = report_update->add_tablets();
-    tablet_report->set_tablet_id(reported.tablet_id());
-    RETURN_NOT_OK_PREPEND(HandleReportedTablet(ts_desc, reported, tablet_report),
-                          Substitute("Error handling $0", SecureShortDebugString(reported)));
-  }
-
-  if (report.updated_tablets_size() > 0) {
-    background_tasks_->WakeIfHasPendingUpdates();
-  }
-
-  return Status::OK();
-}
-
 namespace {
-// Return true if receiving 'report' for a tablet in CREATING state should
-// transition it to the RUNNING state.
-bool ShouldTransitionTabletToRunning(const ReportedTabletPB& report) {
+
+// Returns true if 'report' for 'tablet' should cause it to transition to RUNNING.
+//
+// Note: do not use the consensus state in 'report'; use 'cstate' instead.
+bool ShouldTransitionTabletToRunning(const scoped_refptr<TabletInfo>& tablet,
+                                     const ReportedTabletPB& report,
+                                     const ConsensusStatePB& cstate) {
+  // Does the master think the tablet is running?
+  if (tablet->metadata().state().is_running()) return false;
+
+  // Does the report indicate that the tablet is running?
   if (report.state() != tablet::RUNNING) return false;
 
   // In many tests, we disable leader election, so newly created tablets
@@ -2319,280 +2282,11 @@ bool ShouldTransitionTabletToRunning(const ReportedTabletPB& report) {
 
   // Otherwise, we only transition to RUNNING once there is a leader that is a
   // member of the committed configuration.
-  const ConsensusStatePB& cstate = report.consensus_state();
   return !cstate.leader_uuid().empty() &&
       IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config());
 }
-} // anonymous namespace
-
-Status CatalogManager::HandleReportedTablet(TSDescriptor* ts_desc,
-                                            const ReportedTabletPB& report,
-                                            ReportedTabletUpdatesPB *report_updates) {
-  TRACE_EVENT1("master", "HandleReportedTablet",
-               "tablet_id", report.tablet_id());
-  scoped_refptr<TabletInfo> tablet;
-  {
-    shared_lock<LockType> l(lock_);
-    tablet = FindPtrOrNull(tablet_map_, report.tablet_id());
-  }
-  if (!tablet) {
-    // It'd be unsafe to ask the tserver to delete this tablet without first
-    // replicating something to our followers (i.e. to guarantee that we're the
-    // leader). For example, if we were a rogue master, we might be deleting a
-    // tablet created by a new master accidentally. But masters retain metadata
-    // for deleted tablets forever, so a tablet can only be truly unknown in
-    // the event of a serious misconfiguration, such as a tserver heartbeating
-    // to the wrong cluster. Therefore, it should be reasonable to ignore it
-    // and wait for an operator fix the situation.
-    if (FLAGS_catalog_manager_delete_orphaned_tablets) {
-      LOG(INFO) << "Deleting unknown tablet " << report.tablet_id();
-      SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_DELETED,
-                               boost::none, nullptr, ts_desc->permanent_uuid(),
-                               "Report from unknown tablet");
-    } else {
-      LOG(WARNING) << "Ignoring report from unknown tablet: "
-                   << report.tablet_id();
-    }
-    return Status::OK();
-  }
-  DCHECK(tablet->table()); // guaranteed by TabletLoader
-
-  TableMetadataLock table_lock(tablet->table().get(), LockMode::READ);
-  TabletMetadataLock tablet_lock(tablet.get(), LockMode::WRITE);
-
-  // If the TS is reporting a tablet which has been deleted, or a tablet from
-  // a table which has been deleted, send it an RPC to delete it.
-  if (tablet_lock.data().is_deleted() ||
-      table_lock.data().is_deleted()) {
-    report_updates->set_state_msg(tablet_lock.data().pb.state_msg());
-    const string msg = tablet_lock.data().pb.state_msg();
-    LOG(INFO) << Substitute("Got report from deleted tablet $0 ($1): Sending "
-        "delete request for this tablet", tablet->ToString(), msg);
-    // TODO(unknown): Cancel tablet creation, instead of deleting, in cases
-    // where that might be possible (tablet creation timeout & replacement).
-    SendDeleteReplicaRequest(tablet->id(), TABLET_DATA_DELETED,
-                             boost::none, tablet->table(),
-                             ts_desc->permanent_uuid(), msg);
-    return Status::OK();
-  }
-
-  if (!table_lock.data().is_running()) {
-    LOG(INFO) << Substitute("Got report from tablet $0 for non-running table $1: $2",
-                            tablet->id(), tablet->table()->ToString(),
-                            tablet_lock.data().pb.state_msg());
-    report_updates->set_state_msg(tablet_lock.data().pb.state_msg());
-    return Status::OK();
-  }
-
-  // Check if the tablet requires an "alter table" call
-  bool tablet_needs_alter = false;
-  if (report.has_schema_version() &&
-      table_lock.data().pb.version() != report.schema_version()) {
-    if (report.schema_version() > table_lock.data().pb.version()) {
-      LOG(ERROR) << Substitute("TS $0 has reported a schema version greater "
-          "than the current one for tablet $1. Expected version $2 got $3 (corruption)",
-          ts_desc->ToString(), tablet->ToString(),
-          table_lock.data().pb.version(), report.schema_version());
-    } else {
-      LOG(INFO) << Substitute("TS $0 does not have the latest schema for tablet $1. "
-          "Expected version $2 got $3", ts_desc->ToString(), tablet->ToString(),
-          table_lock.data().pb.version(), report.schema_version());
-    }
-    // It's possible that the tablet being reported is a laggy replica, and in fact
-    // the leader has already received an AlterTable RPC. That's OK, though --
-    // it'll safely ignore it if we send another.
-    tablet_needs_alter = true;
-  }
-
-  // Check if we got a report from a tablet that is no longer part of the Raft
-  // config, but is not already TOMBSTONED / DELETED. If so, tombstone it.
-  //
-  // If the report includes a committed raft config, a delete is only sent if
-  // the opid_index is strictly less than the latest reported committed config.
-  // This prevents us from spuriously deleting replicas that have just been
-  // added to a pending config and are in the process of catching up to the log
-  // entry where they were added to the config.
-  const ConsensusStatePB& prev_cstate = tablet_lock.data().pb.consensus_state();
-  const int64_t prev_opid_index = prev_cstate.committed_config().opid_index();
-  const int64_t report_opid_index = report.has_consensus_state() ?
-      report.consensus_state().committed_config().opid_index() : consensus::kInvalidOpIdIndex;
-  if (FLAGS_master_tombstone_evicted_tablet_replicas &&
-      report.tablet_data_state() != TABLET_DATA_TOMBSTONED &&
-      report.tablet_data_state() != TABLET_DATA_DELETED &&
-      !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.committed_config()) &&
-      report_opid_index < prev_opid_index) {
-    const string delete_msg = report_opid_index == consensus::kInvalidOpIdIndex ?
-        "Replica has no consensus available" :
-        Substitute("Replica with old config index $0", report_opid_index);
-    SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED, prev_opid_index,
-        tablet->table(), ts_desc->permanent_uuid(),
-        Substitute("$0 (current committed config index is $1)", delete_msg, prev_opid_index));
-    return Status::OK();
-  }
-
-  // If the tablet has an error and doesn't need to be deleted, there's not
-  // much we can do.
-  if (report.has_error()) {
-    Status s = StatusFromPB(report.error());
-    DCHECK(!s.ok());
-    LOG(WARNING) << Substitute("Tablet $0 has failed on TS $1: $2",
-                               tablet->ToString(), ts_desc->ToString(), s.ToString());
-    return Status::OK();
-  }
-
-  // The report may have a consensus_state even when tombstoned.
-  if (report.has_consensus_state()) {
-    ConsensusStatePB cstate = report.consensus_state();
-
-    // The Master only processes reports for replicas with committed consensus
-    // configurations since it needs the committed index to only cache the most
-    // up-to-date config. Since it's possible for TOMBSTONED replicas with no
-    // ConsensusMetadata on disk to be reported as having no committed config
-    // opid_index, we skip over those replicas.
-    if (!cstate.committed_config().has_opid_index()) {
-      return Status::OK();
-    }
-
-    // If the reported leader is not a member of the committed config then we
-    // disregard the leader state.
-    if (cstate.leader_uuid().empty() ||
-        !IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config())) {
-      cstate.clear_leader_uuid();
-    }
-
-    // If the tablet was not RUNNING, and we have a leader elected, mark it as RUNNING.
-    // We need to wait for a leader before marking a tablet as RUNNING, or else we
-    // could incorrectly consider a tablet created when only a minority of its replicas
-    // were successful. In that case, the tablet would be stuck in this bad state
-    // forever.
-    if (!tablet_lock.data().is_running() && ShouldTransitionTabletToRunning(report)) {
-      DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet_lock.data().pb.state())
-          << Substitute("Tablet in unexpected state: $0: $1", tablet->ToString(),
-                        SecureShortDebugString(tablet_lock.data().pb));
-      // Mark the tablet as running.
-      // TODO(matteo): we could batch the IO onto a background thread, or at
-      // least across multiple tablets in the same report.
-      VLOG(1) << Substitute("Tablet $0 is now online", tablet->ToString());
-      tablet_lock.mutable_data()->set_state(SysTabletsEntryPB::RUNNING,
-                                            "Tablet reported with an active leader");
-    }
-
-    if (cstate.committed_config().opid_index() > prev_cstate.committed_config().opid_index() ||
-        (!cstate.leader_uuid().empty() &&
-         (prev_cstate.leader_uuid().empty() ||
-          cstate.current_term() > prev_cstate.current_term()))) {
-
-      // When a config change is reported to the master, it may not include the
-      // leader because the follower doing the reporting may not know who the
-      // leader is yet (it may have just started up). If the reported config
-      // has the same term as the previous config, and the leader was
-      // previously known for the current term, then retain knowledge of that
-      // leader even if it wasn't reported in the latest config.
-      if (cstate.current_term() == prev_cstate.current_term()) {
-        if (cstate.leader_uuid().empty() && !prev_cstate.leader_uuid().empty()) {
-          cstate.set_leader_uuid(prev_cstate.leader_uuid());
-        // Sanity check to detect consensus divergence bugs.
-        } else if (!cstate.leader_uuid().empty() && !prev_cstate.leader_uuid().empty() &&
-                   cstate.leader_uuid() != prev_cstate.leader_uuid()) {
-          string msg = Substitute("Previously reported cstate for tablet $0 gave "
-                                  "a different leader for term $1 than the current cstate. "
-                                  "Previous cstate: $2. Current cstate: $3.",
-                                  tablet->ToString(), cstate.current_term(),
-                                  SecureShortDebugString(prev_cstate),
-                                  SecureShortDebugString(cstate));
-          LOG(DFATAL) << msg;
-          return Status::InvalidArgument(msg);
-        }
-      }
-
-      // If a replica is reporting a new consensus configuration, update the
-      // master's copy of that configuration.
-      LOG(INFO) << Substitute("T $0 P $1 reported cstate change: $2. New cstate: $3",
-                              tablet->id(), ts_desc->permanent_uuid(),
-                              DiffConsensusStates(prev_cstate, cstate),
-                              SecureShortDebugString(cstate));
-
-      // To ensure consistent handling when modifying or sanitizing the cstate
-      // above, copy the whole report on the stack rather than const-casting.
-      ReportedTabletPB modified_report = report;
-      *modified_report.mutable_consensus_state() = cstate;
-
-      VLOG(2) << Substitute("Updating cstate for tablet $0 from config reported by $1 "
-                            "to that committed in log index $2 with leader state from term $3",
-                            modified_report.tablet_id(), ts_desc->ToString(),
-                            modified_report.consensus_state().committed_config().opid_index(),
-                            modified_report.consensus_state().current_term());
-
-      RETURN_NOT_OK(HandleRaftConfigChanged(modified_report, tablet,
-                                            table_lock, &tablet_lock));
-    }
-  }
-
-  table_lock.Unlock();
-
-  // We update the tablets each time they are reported.
-  // SysCatalogTable::Write will short-circuit the case where the data
-  // has not in fact changed since the previous version and avoid any
-  // unnecessary operations.
-  SysCatalogTable::Actions actions;
-  actions.tablets_to_update.emplace_back(tablet);
-  Status s = sys_catalog_->Write(actions);
-  if (!s.ok()) {
-    LOG(ERROR) << Substitute("Error updating tablets: $0. Tablet report was: $1",
-                             s.ToString(), SecureShortDebugString(report));
-    return s;
-  }
-  tablet_lock.Commit();
-
-  // Need to defer the AlterTable command to after we've committed the new tablet data,
-  // since the tablet report may also be updating the raft config, and the Alter Table
-  // request needs to know who the most recent leader is.
-  if (tablet_needs_alter) {
-    SendAlterTabletRequest(tablet);
-  } else if (report.has_schema_version()) {
-    HandleTabletSchemaVersionReport(tablet, report.schema_version());
-  }
-
-  return Status::OK();
-}
-
-Status CatalogManager::HandleRaftConfigChanged(
-    const ReportedTabletPB& report,
-    const scoped_refptr<TabletInfo>& tablet,
-    const TableMetadataLock& table_lock,
-    TabletMetadataLock* tablet_lock) {
-
-  DCHECK(tablet_lock->is_write_locked());
-  ConsensusStatePB prev_cstate = tablet_lock->mutable_data()->pb.consensus_state();
-  const ConsensusStatePB& cstate = report.consensus_state();
-  *tablet_lock->mutable_data()->pb.mutable_consensus_state() = cstate;
-
-  if (FLAGS_master_tombstone_evicted_tablet_replicas) {
-    unordered_set<string> current_member_uuids;
-    for (const consensus::RaftPeerPB& peer : cstate.committed_config().peers()) {
-      InsertOrDie(&current_member_uuids, peer.permanent_uuid());
-    }
-    // Send a DeleteTablet() request to peers that are not in the new config.
-    for (const consensus::RaftPeerPB& prev_peer : prev_cstate.committed_config().peers()) {
-      const string& peer_uuid = prev_peer.permanent_uuid();
-      if (!ContainsKey(current_member_uuids, peer_uuid)) {
-        SendDeleteReplicaRequest(report.tablet_id(), TABLET_DATA_TOMBSTONED,
-                                 prev_cstate.committed_config().opid_index(),
-                                 tablet->table(), peer_uuid,
-                                 Substitute("TS $0 not found in new config with opid_index $1",
-                                            peer_uuid, cstate.committed_config().opid_index()));
-      }
-    }
-  }
-
-  // If the config is under-replicated, add a server to the config.
-  if (FLAGS_master_add_server_when_underreplicated &&
-      CountVoters(cstate.committed_config()) < table_lock.data().pb.num_replicas()) {
-    SendAddServerRequest(tablet, cstate);
-  }
 
-  return Status::OK();
-}
+} // anonymous namespace
 
 Status CatalogManager::GetTabletReplica(const string& tablet_id,
                                         scoped_refptr<TabletReplica>* replica) const {
@@ -2728,6 +2422,7 @@ class RetryingTSRpcTask : public MonitoredTask {
 
   MonoTime start_timestamp() const override { return start_ts_; }
   MonoTime completion_timestamp() const override { return end_ts_; }
+  const scoped_refptr<TableInfo>& table() const { return table_ ; }
 
  protected:
   // Send an RPC request and register a callback.
@@ -3401,24 +3096,374 @@ void AsyncAddServerTask::HandleResponse(int attempt) {
   }
 }
 
+Status CatalogManager::ProcessTabletReport(
+    TSDescriptor* ts_desc,
+    const TabletReportPB& full_report,
+    TabletReportUpdatesPB* full_report_update,
+    RpcContext* rpc) {
+  int num_tablets = full_report.updated_tablets_size();
+  TRACE_EVENT2("master", "ProcessTabletReport",
+               "requestor", rpc->requestor_string(),
+               "num_tablets", num_tablets);
+  TRACE_COUNTER_INCREMENT("reported_tablets", num_tablets);
+
+  leader_lock_.AssertAcquiredForReading();
+
+  VLOG(2) << Substitute("Received tablet report from $0:\n$1",
+                        RequestorString(rpc), SecureDebugString(full_report));
+
+  // TODO(todd): on a full tablet report, we may want to iterate over the
+  // tablets we think the server should have, compare vs the ones being
+  // reported, and somehow mark any that have been "lost" (eg somehow the
+  // tablet metadata got corrupted or something).
+
+  // Maps a tablet ID to its corresponding tablet report (owned by 'full_report').
+  unordered_map<string, const ReportedTabletPB*> reports;
+
+  // Maps a tablet ID to its corresponding tablet report update (owned by
+  // 'full_report_update').
+  unordered_map<string, ReportedTabletUpdatesPB*> updates;
+
+  // Maps a tablet ID to its corresponding TabletInfo.
+  unordered_map<string, scoped_refptr<TabletInfo>> tablet_infos;
+
+  // Keeps track of all RPCs that should be sent when we're done.
+  vector<unique_ptr<RetryingTSRpcTask>> rpcs;
+
+  // Locks the referenced tables (for READ) and tablets (for WRITE).
+  //
+  // We must hold the tablets' locks while writing to the catalog table, and
+  // since they're locked for WRITE, we have to lock them en masse in order to
+  // avoid deadlocking.
+  //
+  // We have more freedom with the table locks: we could acquire them en masse,
+  // or we could acquire, use, and release them one at a time. So why do we
+  // acquire en masse? Because it reduces the overall number of lock
+  // acquisitions by reusing locks for tablets belonging to the same table, and
+  // although one-at-a-time acquisition would reduce table lock contention when
+  // writing, table writes are very rare events.
+  TableMetadataGroupLock tables_lock(LockMode::RELEASED);
+  TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);
+
+  // 1. Set up local state.
+  full_report_update->mutable_tablets()->Reserve(num_tablets);
+  {
+    // We only need to acquire lock_ for the tablet_map_ access, but since it's
+    // acquired exclusively so rarely, it's probably cheaper to acquire and
+    // hold it for all tablets here than to acquire/release it for each tablet.
+    shared_lock<LockType> l(lock_);
+    for (const ReportedTabletPB& report : full_report.updated_tablets()) {
+      const string& tablet_id = report.tablet_id();
+
+      // 1a. Prepare an update entry for this tablet. Every tablet in the
+      // report gets one, even if there's no change to it.
+      ReportedTabletUpdatesPB* update = full_report_update->add_tablets();
+      update->set_tablet_id(tablet_id);
+
+      // 1b. Find the tablet, deleting/skipping it if it can't be found.
+      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(tablet_map_, tablet_id);
+      if (!tablet) {
+        // It'd be unsafe to ask the tserver to delete this tablet without first
+        // replicating something to our followers (i.e. to guarantee that we're
+        // the leader). For example, if we were a rogue master, we might be
+        // deleting a tablet created by a new master accidentally. But masters
+        // retain metadata for deleted tablets forever, so a tablet can only be
+        // truly unknown in the event of a serious misconfiguration, such as a
+        // tserver heartbeating to the wrong cluster. Therefore, it should be
+        // reasonable to ignore it and wait for an operator fix the situation.
+        LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
+        continue;
+      }
+
+      // 1c. Found the tablet, update local state. If multiple tablets with the
+      // same ID are in the report, all but the last one will be ignored.
+      reports[tablet_id] = &report;
+      updates[tablet_id] = update;
+      tablet_infos[tablet_id] = tablet;
+      tables_lock.AddInfo(*tablet->table().get());
+      tablets_lock.AddMutableInfo(tablet.get());
+    }
+  }
+
+  // 2. Lock the affected tables and tablets.
+  tables_lock.Lock(LockMode::READ);
+  tablets_lock.Lock(LockMode::WRITE);
+
+  // 3. Process each tablet. This may not be in the order that the tablets
+  // appear in 'full_report', but that has no bearing on correctness.
+  vector<scoped_refptr<TabletInfo>> mutated_tablets;
+  for (const auto& e : tablet_infos) {
+    const string& tablet_id = e.first;
+    const scoped_refptr<TabletInfo>& tablet = e.second;
+    const scoped_refptr<TableInfo>& table = tablet->table();
+    const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
+    ReportedTabletUpdatesPB* update = FindOrDie(updates, tablet_id);
+    bool tablet_was_mutated = false;
+
+    // 4. Delete the tablet if it (or its table) have been deleted.
+    if (tablet->metadata().state().is_deleted() ||
+        table->metadata().state().is_deleted()) {
+      const string& msg = tablet->metadata().state().pb.state_msg();
+      update->set_state_msg(msg);
+      LOG(INFO) << Substitute("Got report from deleted tablet $0 ($1): Sending "
+          "delete request for this tablet", tablet->ToString(), msg);
+
+      // TODO(unknown): Cancel tablet creation, instead of deleting, in cases
+      // where that might be possible (tablet creation timeout & replacement).
+      rpcs.emplace_back(new AsyncDeleteReplica(
+          master_, ts_desc->permanent_uuid(), table, tablet_id,
+          TABLET_DATA_DELETED, boost::none, msg));
+      continue;
+    }
+
+    // 5. Tombstone a replica that is no longer part of the Raft config (and
+    // not already tombstoned or deleted outright).
+    //
+    // If the report includes a committed raft config, we only tombstone if
+    // the opid_index is strictly less than the latest reported committed
+    // config. This prevents us from spuriously deleting replicas that have
+    // just been added to the committed config and are in the process of copying.
+    const ConsensusStatePB& prev_cstate = tablet->metadata().state().pb.consensus_state();
+    const int64_t prev_opid_index = prev_cstate.committed_config().opid_index();
+    const int64_t report_opid_index = (report.has_consensus_state() &&
+        report.consensus_state().committed_config().has_opid_index()) ?
+            report.consensus_state().committed_config().opid_index() :
+            consensus::kInvalidOpIdIndex;
+    if (FLAGS_master_tombstone_evicted_tablet_replicas &&
+        report.tablet_data_state() != TABLET_DATA_TOMBSTONED &&
+        report.tablet_data_state() != TABLET_DATA_DELETED &&
+        !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.committed_config()) &&
+        report_opid_index < prev_opid_index) {
+      const string delete_msg = report_opid_index == consensus::kInvalidOpIdIndex ?
+          "Replica has no consensus available" :
+          Substitute("Replica with old config index $0", report_opid_index);
+      rpcs.emplace_back(new AsyncDeleteReplica(
+          master_, ts_desc->permanent_uuid(), table, tablet_id,
+          TABLET_DATA_TOMBSTONED, prev_opid_index,
+          Substitute("$0 (current committed config index is $1)",
+                     delete_msg, prev_opid_index)));
+      continue;
+    }
+
+    // 6. Skip a non-deleted tablet which reports an error.
+    if (report.has_error()) {
+      Status s = StatusFromPB(report.error());
+      DCHECK(!s.ok());
+      LOG(WARNING) << Substitute("Tablet $0 has failed on TS $1: $2",
+                                 tablet->ToString(), ts_desc->ToString(), s.ToString());
+      continue;
+    }
+
+    // 7. Process the report's consensus state. There may be one even when the
+    // replica has been tombstoned.
+    if (report.has_consensus_state()) {
+      // 7a. The master only processes reports for replicas with committed
+      // consensus configurations since it needs the committed index to only
+      // cache the most up-to-date config. Since it's possible for TOMBSTONED
+      // replicas with no ConsensusMetadata on disk to be reported as having no
+      // committed config opid_index, we skip over those replicas.
+      if (!report.consensus_state().committed_config().has_opid_index()) {
+        continue;
+      }
+
+      // 7b. Disregard the leader state if the reported leader is not a member
+      // of the committed config.
+      ConsensusStatePB cstate = report.consensus_state();
+      if (cstate.leader_uuid().empty() ||
+          !IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config())) {
+        cstate.clear_leader_uuid();
+      }
+
+      // 7c. Mark the tablet as RUNNING if it makes sense to do so.
+      //
+      // We need to wait for a leader before marking a tablet as RUNNING, or
+      // else we could incorrectly consider a tablet created when only a
+      // minority of its replicas were successful. In that case, the tablet
+      // would be stuck in this bad state forever.
+      if (ShouldTransitionTabletToRunning(tablet, report, cstate)) {
+        DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet->metadata().state().pb.state())
+            << Substitute("Tablet in unexpected state: $0: $1", tablet->ToString(),
+                          SecureShortDebugString(tablet->metadata().state().pb));
+        VLOG(1) << Substitute("Tablet $0 is now online", tablet->ToString());
+        tablet->mutable_metadata()->mutable_dirty()->set_state(
+            SysTabletsEntryPB::RUNNING, "Tablet reported with an active leader");
+        tablet_was_mutated = true;
+      }
+
+      // 7d. Update the consensus state if:
+      // - A config change operation was committed (reflected by a change to
+      //   the committed config's opid_index).
+      // - The new cstate has a leader, and either the old cstate didn't, or
+      //   there was a term change.
+      if (cstate.committed_config().opid_index() > prev_cstate.committed_config().opid_index() ||
+          (!cstate.leader_uuid().empty() &&
+           (prev_cstate.leader_uuid().empty() ||
+            cstate.current_term() > prev_cstate.current_term()))) {
+
+        // 7d(i). Retain knowledge of the leader even if it wasn't reported in
+        // the latest config.
+        //
+        // When a config change is reported to the master, it may not include
+        // the leader because the follower doing the reporting may not know who
+        // the leader is yet (it may have just started up). It is safe to reuse
+        // the previous leader if the reported cstate has the same term as the
+        // previous cstate, and the leader was known for that term.
+        if (cstate.current_term() == prev_cstate.current_term()) {
+          if (cstate.leader_uuid().empty() && !prev_cstate.leader_uuid().empty()) {
+            cstate.set_leader_uuid(prev_cstate.leader_uuid());
+            // Sanity check to detect consensus divergence bugs.
+          } else if (!cstate.leader_uuid().empty() &&
+              !prev_cstate.leader_uuid().empty() &&
+              cstate.leader_uuid() != prev_cstate.leader_uuid()) {
+            string msg = Substitute("Previously reported cstate for tablet $0 gave "
+                "a different leader for term $1 than the current cstate. "
+                "Previous cstate: $2. Current cstate: $3.",
+                tablet->ToString(), cstate.current_term(),
+                SecureShortDebugString(prev_cstate),
+                SecureShortDebugString(cstate));
+            LOG(DFATAL) << msg;
+            continue;
+          }
+        }
+
+        LOG(INFO) << Substitute("T $0 P $1 reported cstate change: $2. New cstate: $3",
+                                tablet->id(), ts_desc->permanent_uuid(),
+                                DiffConsensusStates(prev_cstate, cstate),
+                                SecureShortDebugString(cstate));
+        VLOG(2) << Substitute("Updating cstate for tablet $0 from config reported by $1 "
+            "to that committed in log index $2 with leader state from term $3",
+            tablet_id, ts_desc->ToString(), cstate.committed_config().opid_index(),
+            cstate.current_term());
+
+        // 7d(ii). Update the consensus state.
+        *tablet->mutable_metadata()->mutable_dirty()->pb.mutable_consensus_state() = cstate;
+        tablet_was_mutated = true;
+
+        // 7d(iii). Delete any replicas from the previous config that are not
+        // in the new one.
+        if (FLAGS_master_tombstone_evicted_tablet_replicas) {
+          unordered_set<string> current_member_uuids;
+          for (const auto& p : cstate.committed_config().peers()) {
+            InsertOrDie(&current_member_uuids, p.permanent_uuid());
+          }
+          for (const auto& p : prev_cstate.committed_config().peers()) {
+            const string& peer_uuid = p.permanent_uuid();
+            if (!ContainsKey(current_member_uuids, peer_uuid)) {
+              rpcs.emplace_back(new AsyncDeleteReplica(
+                  master_, peer_uuid, table, tablet_id,
+                  TABLET_DATA_TOMBSTONED, prev_cstate.committed_config().opid_index(),
+                  Substitute("TS $0 not found in new config with opid_index $1",
+                             peer_uuid, cstate.committed_config().opid_index())));
+            }
+          }
+        }
+
+        // 7d(iv). Add a server to the config if it is under-replicated.
+        //
+        // This is an idempotent operation due to a CAS enforced on the
+        // committed config's opid_index.
+        if (FLAGS_master_add_server_when_underreplicated &&
+            CountVoters(cstate.committed_config()) < table->metadata().state().pb.num_replicas()) {
+          rpcs.emplace_back(new AsyncAddServerTask(master_, tablet, cstate, &rng_));
+        }
+      }
+    }
+
+    // 8. Send an AlterSchema RPC if the tablet has an old schema version.
+    uint32_t table_schema_version = table->metadata().state().pb.version();
+    if (report.has_schema_version() &&
+        report.schema_version() != table_schema_version) {
+      if (report.schema_version() > table_schema_version) {
+        LOG(ERROR) << Substitute("TS $0 has reported a schema version greater "
+            "than the current one for tablet $1. Expected version $2 got $3 (corruption)",
+            ts_desc->ToString(), tablet->ToString(), table_schema_version,
+            report.schema_version());
+      } else {
+        LOG(INFO) << Substitute("TS $0 does not have the latest schema for tablet $1. "
+            "Expected version $2 got $3", ts_desc->ToString(), tablet->ToString(),
+            table_schema_version, report.schema_version());
+      }
+
+      // It's possible that the tablet being reported is a laggy replica, and
+      // in fact the leader has already received an AlterTable RPC. That's OK,
+      // though -- it'll safely ignore it if we send another.
+      rpcs.emplace_back(new AsyncAlterTable(master_, tablet));
+    }
+
+    // 9. If the tablet was mutated, add it to the tablets to be re-persisted.
+    //
+    // Done here and not on a per-mutation basis to avoid duplicate entries.
+    if (tablet_was_mutated) {
+      mutated_tablets.push_back(tablet);
+    }
+  }
+
+  // 10. Unlock the tables; we no longer need to access their state.
+  tables_lock.Unlock();
+
+  // 11. Write all tablet mutations to the catalog table.
+  //
+  // SysCatalogTable::Write will short-circuit the case where the data has not
+  // in fact changed since the previous version and avoid any unnecessary mutations.
+  SysCatalogTable::Actions actions;
+  actions.tablets_to_update = std::move(mutated_tablets);
+  Status s = sys_catalog_->Write(actions);
+  if (!s.ok()) {
+    LOG(ERROR) << Substitute(
+        "Error updating tablets from $0: $1. Tablet report was: $2",
+        ts_desc->permanent_uuid(), s.ToString(), SecureShortDebugString(full_report));
+    return s;
+  }
+
+  // Having successfully written the tablet mutations, this function cannot
+  // fail from here on out.
+
+  // 12. Publish the in-memory tablet mutations and release the locks.
+  tablets_lock.Commit();
+
+  // 13. Process all tablet schema version changes.
+  //
+  // This is separate from tablet state mutations because only tablet in-memory
+  // state (and table on-disk state) is changed.
+  for (const auto& e : tablet_infos) {
+    const string& tablet_id = e.first;
+    const scoped_refptr<TabletInfo>& tablet = e.second;
+    const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
+    if (report.has_schema_version()) {
+      HandleTabletSchemaVersionReport(tablet, report.schema_version());
+    }
+  }
+
+  // 14. Send all queued RPCs.
+  for (auto& rpc : rpcs) {
+    if (rpc->table() != nullptr) {
+      rpc->table()->AddTask(rpc.get());
+    } else {
+      // This is a floating task (since the table does not exist) created in
+      // response to a tablet report.
+      rpc->AddRef();
+    }
+    WARN_NOT_OK(rpc->Run(), Substitute("Failed to send $0", rpc->description()));
+    rpc.release();
+  }
+
+  return Status::OK();
+}
+
 void CatalogManager::SendAlterTableRequest(const scoped_refptr<TableInfo>& table) {
-  vector<scoped_refptr<TabletInfo> > tablets;
+  vector<scoped_refptr<TabletInfo>> tablets;
   table->GetAllTablets(&tablets);
 
   for (const scoped_refptr<TabletInfo>& tablet : tablets) {
-    SendAlterTabletRequest(tablet);
+    auto call = new AsyncAlterTable(master_, tablet);
+    table->AddTask(call);
+    WARN_NOT_OK(call->Run(), "Failed to send alter table request");
   }
 }
 
-void CatalogManager::SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tablet) {
-  auto call = new AsyncAlterTable(master_, tablet);
-  tablet->table()->AddTask(call);
-  WARN_NOT_OK(call->Run(), "Failed to send alter table request");
-}
-
 void CatalogManager::SendDeleteTableRequest(const scoped_refptr<TableInfo>& table,
                                             const string& deletion_msg) {
-  vector<scoped_refptr<TabletInfo> > tablets;
+  vector<scoped_refptr<TabletInfo>> tablets;
   table->GetAllTablets(&tablets);
 
   for (const scoped_refptr<TabletInfo>& tablet : tablets) {
@@ -3443,46 +3488,15 @@ void CatalogManager::SendDeleteTabletRequest(const scoped_refptr<TabletInfo>& ta
       << "Sending DeleteTablet for " << cstate.committed_config().peers().size()
       << " replicas of tablet " << tablet->id();
   for (const auto& peer : cstate.committed_config().peers()) {
-    SendDeleteReplicaRequest(tablet->id(), TABLET_DATA_DELETED,
-                             boost::none, tablet->table(),
-                             peer.permanent_uuid(), deletion_msg);
+    AsyncDeleteReplica* call = new AsyncDeleteReplica(
+        master_, peer.permanent_uuid(), tablet->table(), tablet->id(),
+        TABLET_DATA_DELETED, boost::none, deletion_msg);
+    tablet->table()->AddTask(call);
+    WARN_NOT_OK(call->Run(), Substitute(
+        "Failed to send DeleteReplica request for tablet $0", tablet->id()));
   }
 }
 
-void CatalogManager::SendDeleteReplicaRequest(
-    const string& tablet_id,
-    TabletDataState delete_type,
-    const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
-    const scoped_refptr<TableInfo>& table,
-    const string& ts_uuid,
-    const string& reason) {
-  AsyncDeleteReplica* call =
-      new AsyncDeleteReplica(master_, ts_uuid, table,
-                             tablet_id, delete_type, cas_config_opid_index_less_or_equal,
-                             reason);
-  if (table != nullptr) {
-    table->AddTask(call);
-  } else {
-    // This is a floating task (since the table does not exist)
-    // created as response to a tablet report.
-    call->AddRef();
-  }
-  WARN_NOT_OK(call->Run(),
-              Substitute("Failed to send DeleteReplica request for tablet $0", tablet_id));
-}
-
-void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
-                                          const ConsensusStatePB& cstate) {
-  auto task = new AsyncAddServerTask(master_, tablet, cstate, &rng_);
-  tablet->table()->AddTask(task);
-  WARN_NOT_OK(task->Run(),
-              Substitute("Failed to send AddServer request for tablet $0", tablet->id()));
-  // We can't access 'task' after calling Run() because it may delete itself
-  // inside Run() in the case that the tablet has no known leader.
-  LOG_WITH_PREFIX(INFO)
-      << "Started AddServer task for tablet " << tablet->id();
-}
-
 void CatalogManager::ExtractTabletsToProcess(
     vector<scoped_refptr<TabletInfo>>* tablets_to_process) {
 
@@ -3625,7 +3639,7 @@ void CatalogManager::HandleAssignCreatingTablet(const scoped_refptr<TabletInfo>&
 
 // TODO(unknown): we could batch the IO onto a background thread.
 //                but this is following the current HandleReportedTablet()
-Status CatalogManager::HandleTabletSchemaVersionReport(
+void CatalogManager::HandleTabletSchemaVersionReport(
     const scoped_refptr<TabletInfo>& tablet,
     uint32_t version) {
   // Update the schema version if it's the latest
@@ -3635,12 +3649,12 @@ Status CatalogManager::HandleTabletSchemaVersionReport(
   const scoped_refptr<TableInfo>& table = tablet->table();
   TableMetadataLock l(table.get(), LockMode::WRITE);
   if (l.data().is_deleted() || l.data().pb.state() != SysTablesEntryPB::ALTERING) {
-    return Status::OK();
+    return;
   }
 
   uint32_t current_version = l.data().pb.version();
   if (table->IsAlterInProgress(current_version)) {
-    return Status::OK();
+    return;
   }
 
   // Update the state from altering to running and remove the last fully
@@ -3655,13 +3669,12 @@ Status CatalogManager::HandleTabletSchemaVersionReport(
   if (!s.ok()) {
     LOG_WITH_PREFIX(WARNING)
         << "An error occurred while updating sys-tables: " << s.ToString();
-    return s;
+    return;
   }
 
   l.Commit();
   LOG_WITH_PREFIX(INFO) << Substitute("$0 alter complete (version $1)",
                                       table->ToString(), current_version);
-  return Status::OK();
 }
 
 Status CatalogManager::ProcessPendingAssignments(

http://git-wip-us.apache.org/repos/asf/kudu/blob/29a7568d/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 0a06912..41d0134 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -39,7 +39,6 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/ts_manager.h"
-#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tserver/tablet_replica_lookup.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/cow_object.h"
@@ -50,11 +49,6 @@
 #include "kudu/util/rw_mutex.h"
 #include "kudu/util/status.h"
 
-namespace boost {
-template <class T>
-class optional;
-}
-
 namespace kudu {
 
 class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
@@ -568,8 +562,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // The RPC context is provided for logging/tracing purposes,
   // but this function does not itself respond to the RPC.
   Status ProcessTabletReport(TSDescriptor* ts_desc,
-                             const TabletReportPB& report,
-                             TabletReportUpdatesPB *report_update,
+                             const TabletReportPB& full_report,
+                             TabletReportUpdatesPB* full_report_update,
                              rpc::RpcContext* rpc);
 
   SysCatalogTable* sys_catalog() { return sys_catalog_.get(); }
@@ -728,20 +722,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   Status FindTable(const TableIdentifierPB& table_identifier,
                    scoped_refptr<TableInfo>* table_info);
 
-  // Handle one of the tablets in a tablet reported.
-  // Requires that the lock is already held.
-  Status HandleReportedTablet(TSDescriptor* ts_desc,
-                              const ReportedTabletPB& report,
-                              ReportedTabletUpdatesPB *report_updates);
-  // Performs metadata updates and sends RPCs based on the Raft-related changes
-  // to 'tablet' described in 'report'.
-  //
-  // 'table_lock' must be held for reading and 'tablet_lock' for writing.
-  Status HandleRaftConfigChanged(const ReportedTabletPB& report,
-                                 const scoped_refptr<TabletInfo>& tablet,
-                                 const TableMetadataLock& table_lock,
-                                 TabletMetadataLock* tablet_lock);
-
   // Extract the set of tablets that must be processed because not running yet.
   void ExtractTabletsToProcess(std::vector<scoped_refptr<TabletInfo>>* tablets_to_process);
 
@@ -805,8 +785,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                                   DeferredAssignmentActions* deferred,
                                   scoped_refptr<TabletInfo>* new_tablet);
 
-  Status HandleTabletSchemaVersionReport(const scoped_refptr<TabletInfo>& tablet,
-                                         uint32_t version);
+  void HandleTabletSchemaVersionReport(const scoped_refptr<TabletInfo>& tablet,
+                                       uint32_t version);
 
   // Send the "create tablet request" to all peers of a particular tablet.
   //.
@@ -827,10 +807,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Send the "alter table request" to all tablets of the specified table.
   void SendAlterTableRequest(const scoped_refptr<TableInfo>& table);
 
-  // Start the background task to send the AlterTable() RPC to the leader for this
-  // tablet.
-  void SendAlterTabletRequest(const scoped_refptr<TabletInfo>& tablet);
-
   // Send the "delete tablet request" to all replicas of all tablets of the
   // specified table.
   void SendDeleteTableRequest(const scoped_refptr<TableInfo>& table,
@@ -843,20 +819,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                                const TabletMetadataLock& tablet_lock,
                                const std::string& deletion_msg);
 
-  // Send the "delete tablet request" to a particular replica (i.e. TS and
-  // tablet combination). The specified 'reason' will be logged on the TS.
-  void SendDeleteReplicaRequest(const std::string& tablet_id,
-                                tablet::TabletDataState delete_type,
-                                const boost::optional<int64_t>& cas_config_opid_index_less_or_equal,
-                                const scoped_refptr<TableInfo>& table,
-                                const std::string& ts_uuid,
-                                const std::string& reason);
-
-  // Start a task to change the config to add an additional voter because the
-  // specified tablet is under-replicated.
-  void SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
-                            const consensus::ConsensusStatePB& cstate);
-
   std::string GenerateId() { return oid_generator_.Next(); }
 
   // Conventional "T xxx P yyy: " prefix for logging.