You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/08/01 22:11:39 UTC
[2/9] kudu git commit: KUDU-1358 (part 1): master should accept
heartbeat even if follower
KUDU-1358 (part 1): master should accept heartbeat even if follower
This patch changes the master's heartbeat acceptance code so that heartbeats
are not rejected outright if the master is a follower. To be specific,
tablet reports are ignored, but heartbeats are processed just enough to warm
the TSDescriptor cache. That way, if this master is elected leader, it can
respond to a CreateTable() even before the first round of heartbeats.
I reduced the complexity of the "should this tserver register or send a full
tablet report?" dance by removing TSDescriptor.has_tablet_report_. It was
used to guarantee a full tablet report in the event that 1) the tserver is
sending incremental tablet reports, and 2) the master has already registered
the tserver. I don't think this exact sequence of events is actually
possible; the only way a master can "lose" a cached TSDescriptor is if the
master is restarted, at which point it loses the tserver registration too.
Plus, all the unit tests passed (in slow mode).
I also snuck in a fix to TSManager::RegisterTS: it wasn't actually returning
a TSDescriptor in its out parameter.
Change-Id: I578674927b65b4171e8437de8515130e4a0ed139
Reviewed-on: http://gerrit.cloudera.org:8080/3609
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8618ae2d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8618ae2d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8618ae2d
Branch: refs/heads/master
Commit: 8618ae2d6ce867879fe550c107f6178b167d507f
Parents: 8d89d5f
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Jun 10 16:42:58 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Jul 31 23:44:58 2016 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/alter_table-test.cc | 1 -
.../master_replication-itest.cc | 49 +++++++++--
src/kudu/integration-tests/mini_cluster.cc | 35 +++++---
src/kudu/integration-tests/mini_cluster.h | 16 +++-
src/kudu/integration-tests/registration-test.cc | 6 +-
.../integration-tests/table_locations-itest.cc | 3 +-
src/kudu/master/catalog_manager.cc | 8 --
src/kudu/master/catalog_manager.h | 24 +++++-
src/kudu/master/master-test.cc | 70 ++++++++++++++--
src/kudu/master/master_service.cc | 86 ++++++++------------
src/kudu/master/ts_descriptor.cc | 19 +----
src/kudu/master/ts_descriptor.h | 10 +--
src/kudu/master/ts_manager.cc | 18 ++--
13 files changed, 223 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/alter_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 8c97a39..85321ff 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -108,7 +108,6 @@ class AlterTableTest : public KuduTest {
opts.num_tablet_servers = num_replicas();
cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(cluster_->Start());
- ASSERT_OK(cluster_->WaitForTabletServerCount(num_replicas()));
CHECK_OK(KuduClientBuilder()
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr_str())
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/master_replication-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc
index 6c5c429..47e47ea 100644
--- a/src/kudu/integration-tests/master_replication-itest.cc
+++ b/src/kudu/integration-tests/master_replication-itest.cc
@@ -26,7 +26,10 @@
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
#include "kudu/util/test_util.h"
using std::vector;
@@ -65,7 +68,6 @@ class MasterReplicationTest : public KuduTest {
KuduTest::SetUp();
cluster_.reset(new MiniCluster(env_.get(), opts_));
ASSERT_OK(cluster_->Start());
- ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
}
virtual void TearDown() OVERRIDE {
@@ -79,7 +81,6 @@ class MasterReplicationTest : public KuduTest {
Status RestartCluster() {
cluster_->Shutdown();
RETURN_NOT_OK(cluster_->Start());
- RETURN_NOT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
return Status::OK();
}
@@ -89,7 +90,6 @@ class MasterReplicationTest : public KuduTest {
SleepFor(MonoDelta::FromMilliseconds(millis));
LOG(INFO) << "Attempting to start the cluster...";
CHECK_OK(cluster_->Start());
- CHECK_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
}
void ListMasterServerAddrs(vector<string>* out) {
@@ -152,8 +152,6 @@ TEST_F(MasterReplicationTest, TestSysTablesReplication) {
ASSERT_OK(CreateClient(&client));
ASSERT_OK(CreateTable(client, kTableId1));
- ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServerReplicas));
-
// Repeat the same for the second table.
ASSERT_OK(CreateTable(client, kTableId2));
ASSERT_NO_FATAL_FAILURE(VerifyTableExists(kTableId2));
@@ -211,5 +209,46 @@ TEST_F(MasterReplicationTest, TestCycleThroughAllMasters) {
ASSERT_OK(ThreadJoiner(start_thread.get()).Join());
}
+// Test that every master accepts heartbeats, and that a heartbeat to any
+// master updates its TSDescriptor cache.
+TEST_F(MasterReplicationTest, TestHeartbeatAcceptedByAnyMaster) {
+ // Register a fake tserver with every master.
+ TSToMasterCommonPB common;
+ common.mutable_ts_instance()->set_permanent_uuid("fake-ts-uuid");
+ common.mutable_ts_instance()->set_instance_seqno(1);
+ TSRegistrationPB fake_reg;
+ HostPortPB* pb = fake_reg.add_rpc_addresses();
+ pb->set_host("localhost");
+ pb->set_port(1000);
+ pb = fake_reg.add_http_addresses();
+ pb->set_host("localhost");
+ pb->set_port(2000);
+ std::shared_ptr<rpc::Messenger> messenger;
+ rpc::MessengerBuilder bld("Client");
+ ASSERT_OK(bld.Build(&messenger));
+ for (int i = 0; i < cluster_->num_masters(); i++) {
+ TSHeartbeatRequestPB req;
+ TSHeartbeatResponsePB resp;
+ rpc::RpcController rpc;
+
+ req.mutable_common()->CopyFrom(common);
+ req.mutable_registration()->CopyFrom(fake_reg);
+
+ MasterServiceProxy proxy(messenger,
+ cluster_->mini_master(i)->bound_rpc_addr());
+
+ // All masters (including followers) should accept the heartbeat.
+ ASSERT_OK(proxy.TSHeartbeat(req, &resp, &rpc));
+ SCOPED_TRACE(resp.DebugString());
+ ASSERT_FALSE(resp.has_error());
+ }
+
+ // Now each master should have four registered tservers.
+ vector<std::shared_ptr<TSDescriptor>> descs;
+ ASSERT_OK(cluster_->WaitForTabletServerCount(
+ kNumTabletServerReplicas + 1,
+ MiniCluster::MatchMode::DO_NOT_MATCH_TSERVERS, &descs));
+}
+
} // namespace master
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index 2b5ef67..62e4531 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -264,12 +264,13 @@ Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
}
Status MiniCluster::WaitForTabletServerCount(int count) {
- vector<shared_ptr<master::TSDescriptor> > descs;
- return WaitForTabletServerCount(count, &descs);
+ vector<shared_ptr<master::TSDescriptor>> descs;
+ return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
}
Status MiniCluster::WaitForTabletServerCount(int count,
- vector<shared_ptr<TSDescriptor> >* descs) {
+ MatchMode mode,
+ vector<shared_ptr<TSDescriptor>>* descs) {
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
@@ -279,15 +280,27 @@ Status MiniCluster::WaitForTabletServerCount(int count,
// Do a second step of verification to verify that the descs that we got
// are aligned (same uuid/seqno) with the TSs that we have in the cluster.
int match_count = 0;
- for (const shared_ptr<TSDescriptor>& desc : *descs) {
- for (auto mini_tablet_server : mini_tablet_servers_) {
- auto ts = mini_tablet_server->server();
- if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
- ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
- match_count++;
- break;
+ switch (mode) {
+ case MatchMode::MATCH_TSERVERS:
+ // GetAllDescriptors() may return servers that are no longer online.
+ // Do a second step of verification to verify that the descs that we got
+ // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+ for (const shared_ptr<TSDescriptor>& desc : *descs) {
+ for (auto mini_tablet_server : mini_tablet_servers_) {
+ auto ts = mini_tablet_server->server();
+ if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
+ ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
+ match_count++;
+ break;
+ }
+ }
}
- }
+ break;
+ case MatchMode::DO_NOT_MATCH_TSERVERS:
+ match_count = descs->size();
+ break;
+ default:
+ LOG(FATAL) << "Invalid match mode";
}
if (match_count == count) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.h b/src/kudu/integration-tests/mini_cluster.h
index e682316..12a5211 100644
--- a/src/kudu/integration-tests/mini_cluster.h
+++ b/src/kudu/integration-tests/mini_cluster.h
@@ -141,9 +141,21 @@ class MiniCluster {
// Wait until the number of registered tablet servers reaches the given
// count. Returns Status::TimedOut if the desired count is not achieved
// within kRegistrationWaitTimeSeconds.
+ enum class MatchMode {
+ // Ensure that the tservers retrieved from each master match up against the
+ // tservers defined in this cluster. The matching is done via
+ // NodeInstancePBs comparisons. If even one match fails, the retrieved
+ // response is considered to be malformed and is retried.
+ //
+ // Note: tservers participate in matching even if they are shut down.
+ MATCH_TSERVERS,
+
+ // Do not perform any matching on the retrieved tservers.
+ DO_NOT_MATCH_TSERVERS,
+ };
Status WaitForTabletServerCount(int count);
- Status WaitForTabletServerCount(int count,
- std::vector<std::shared_ptr<master::TSDescriptor> >* descs);
+ Status WaitForTabletServerCount(int count, MatchMode mode,
+ std::vector<std::shared_ptr<master::TSDescriptor>>* descs);
// Create a client configured to talk to this cluster. Builder may contain
// override options for the client. The master address will be overridden to
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/registration-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index b0a1497..9eeca5f 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -96,7 +96,8 @@ class RegistrationTest : public KuduTest {
TEST_F(RegistrationTest, TestTSRegisters) {
// Wait for the TS to register.
vector<shared_ptr<TSDescriptor> > descs;
- ASSERT_OK(cluster_->WaitForTabletServerCount(1, &descs));
+ ASSERT_OK(cluster_->WaitForTabletServerCount(
+ 1, MiniCluster::MatchMode::MATCH_TSERVERS, &descs));
ASSERT_EQ(1, descs.size());
// Verify that the registration is sane.
@@ -123,7 +124,6 @@ TEST_F(RegistrationTest, TestTSRegisters) {
// Test starting multiple tablet servers and ensuring they both register with the master.
TEST_F(RegistrationTest, TestMultipleTS) {
- ASSERT_OK(cluster_->WaitForTabletServerCount(1));
ASSERT_OK(cluster_->AddTabletServer());
ASSERT_OK(cluster_->WaitForTabletServerCount(2));
}
@@ -135,8 +135,6 @@ TEST_F(RegistrationTest, TestTabletReports) {
string tablet_id_1;
string tablet_id_2;
- ASSERT_OK(cluster_->WaitForTabletServerCount(1));
-
MiniTabletServer* ts = cluster_->mini_tablet_server(0);
string ts_root = cluster_->GetTabletServerFsRoot(0);
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/integration-tests/table_locations-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/table_locations-itest.cc b/src/kudu/integration-tests/table_locations-itest.cc
index 6dfbe56..3f90b18 100644
--- a/src/kudu/integration-tests/table_locations-itest.cc
+++ b/src/kudu/integration-tests/table_locations-itest.cc
@@ -61,13 +61,12 @@ class TableLocationsTest : public KuduTest {
cluster_.reset(new MiniCluster(env_.get(), opts));
ASSERT_OK(cluster_->Start());
- ASSERT_OK(cluster_->WaitForTabletServerCount(kNumTabletServers));
// Create a client proxy to the master.
MessengerBuilder bld("Client");
ASSERT_OK(bld.Build(&client_messenger_));
proxy_.reset(new MasterServiceProxy(client_messenger_,
- cluster_->leader_mini_master()->bound_rpc_addr()));
+ cluster_->mini_master()->bound_rpc_addr()));
}
void TearDown() override {
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6568637..7642ddd 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1485,12 +1485,6 @@ Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
VLOG(2) << "Received tablet report from " <<
RequestorString(rpc) << ": " << report.DebugString();
}
- if (!ts_desc->has_tablet_report() && report.is_incremental()) {
- string msg = "Received an incremental tablet report when a full one was needed";
- LOG(WARNING) << "Invalid tablet report from " << RequestorString(rpc) << ": "
- << msg;
- return Status::IllegalState(msg);
- }
// TODO: on a full tablet report, we may want to iterate over the tablets we think
// the server should have, compare vs the ones being reported, and somehow mark
@@ -1503,8 +1497,6 @@ Status CatalogManager::ProcessTabletReport(TSDescriptor* ts_desc,
Substitute("Error handling $0", reported.ShortDebugString()));
}
- ts_desc->set_has_tablet_report(true);
-
if (report.updated_tablets_size() > 0) {
background_tasks_->WakeIfHasPendingUpdates();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 0196d8d..88a941b 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -344,6 +344,29 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
shared_lock<RWMutex> leader_shared_lock_;
Status catalog_status_;
Status leader_status_;
+
+ DISALLOW_COPY_AND_ASSIGN(ScopedLeaderSharedLock);
+ };
+
+ // Temporarily forces the catalog manager to be a follower. Only for tests!
+ class ScopedLeaderDisablerForTests {
+ public:
+
+ explicit ScopedLeaderDisablerForTests(CatalogManager* catalog)
+ : catalog_(catalog),
+ old_leader_ready_term_(catalog->leader_ready_term_) {
+ catalog_->leader_ready_term_ = -1;
+ }
+
+ ~ScopedLeaderDisablerForTests() {
+ catalog_->leader_ready_term_ = old_leader_ready_term_;
+ }
+
+ private:
+ CatalogManager* catalog_;
+ int64_t old_leader_ready_term_;
+
+ DISALLOW_COPY_AND_ASSIGN(ScopedLeaderDisablerForTests);
};
explicit CatalogManager(Master *master);
@@ -678,7 +701,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf {
std::unordered_set<std::string> reserved_table_names_;
Master *master_;
- Atomic32 closing_;
ObjectIdGenerator oid_generator_;
// Random number generator used for selecting replica locations.
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/master-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master-test.cc b/src/kudu/master/master-test.cc
index 87fb301..8775af1 100644
--- a/src/kudu/master/master-test.cc
+++ b/src/kudu/master/master-test.cc
@@ -130,8 +130,10 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
req.mutable_common()->CopyFrom(common);
ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+ ASSERT_TRUE(resp.leader_master());
ASSERT_TRUE(resp.needs_reregister());
ASSERT_TRUE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.has_tablet_report());
}
vector<shared_ptr<TSDescriptor> > descs;
@@ -154,11 +156,12 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
req.mutable_registration()->CopyFrom(fake_reg);
ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+ ASSERT_TRUE(resp.leader_master());
ASSERT_FALSE(resp.needs_reregister());
- ASSERT_TRUE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.has_tablet_report());
}
- descs.clear();
master_->ts_manager()->GetAllDescriptors(&descs);
ASSERT_EQ(1, descs.size()) << "Should have registered the TS";
TSRegistrationPB reg;
@@ -179,12 +182,33 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
req.mutable_registration()->CopyFrom(fake_reg);
ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+ ASSERT_TRUE(resp.leader_master());
ASSERT_FALSE(resp.needs_reregister());
- ASSERT_TRUE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.has_tablet_report());
+ }
+
+ // If we send the registration RPC while the master isn't the leader, it
+ // shouldn't ask for a full tablet report.
+ {
+ CatalogManager::ScopedLeaderDisablerForTests o(master_->catalog_manager());
+ TSHeartbeatRequestPB req;
+ TSHeartbeatResponsePB resp;
+ RpcController rpc;
+ req.mutable_common()->CopyFrom(common);
+ req.mutable_registration()->CopyFrom(fake_reg);
+ ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+ ASSERT_FALSE(resp.leader_master());
+ ASSERT_FALSE(resp.needs_reregister());
+ ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.has_tablet_report());
}
- // Now send a tablet report
+ // Send a full tablet report, but with the master as a follower. The
+ // report will be ignored.
{
+ CatalogManager::ScopedLeaderDisablerForTests o(master_->catalog_manager());
TSHeartbeatRequestPB req;
TSHeartbeatResponsePB resp;
RpcController rpc;
@@ -194,11 +218,47 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
tr->set_sequence_number(0);
ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+ ASSERT_FALSE(resp.leader_master());
+ ASSERT_FALSE(resp.needs_reregister());
+ ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_FALSE(resp.has_tablet_report());
+ }
+
+ // Now send a full report with the master as leader. The master will process
+ // it; this is reflected in the response.
+ {
+ TSHeartbeatRequestPB req;
+ TSHeartbeatResponsePB resp;
+ RpcController rpc;
+ req.mutable_common()->CopyFrom(common);
+ TabletReportPB* tr = req.mutable_tablet_report();
+ tr->set_is_incremental(false);
+ tr->set_sequence_number(0);
+ ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+ ASSERT_TRUE(resp.leader_master());
+ ASSERT_FALSE(resp.needs_reregister());
+ ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_TRUE(resp.has_tablet_report());
+ }
+
+ // Having sent a full report, an incremental report will also be processed.
+ {
+ TSHeartbeatRequestPB req;
+ TSHeartbeatResponsePB resp;
+ RpcController rpc;
+ req.mutable_common()->CopyFrom(common);
+ TabletReportPB* tr = req.mutable_tablet_report();
+ tr->set_is_incremental(true);
+ tr->set_sequence_number(0);
+ ASSERT_OK(proxy_->TSHeartbeat(req, &resp, &rpc));
+
+ ASSERT_TRUE(resp.leader_master());
ASSERT_FALSE(resp.needs_reregister());
ASSERT_FALSE(resp.needs_full_tablet_report());
+ ASSERT_TRUE(resp.has_tablet_report());
}
- descs.clear();
master_->ts_manager()->GetAllDescriptors(&descs);
ASSERT_EQ(1, descs.size()) << "Should still only have one TS registered";
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/master_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 9e7b08f..23dc37b 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -45,6 +45,7 @@ using consensus::RaftPeerPB;
using std::string;
using std::vector;
using std::shared_ptr;
+using strings::Substitute;
namespace {
@@ -82,78 +83,62 @@ void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
if (!l.CheckIsInitializedOrRespond(resp, rpc)) {
return;
}
+ bool is_leader_master = l.leader_status().ok();
+ // 2. All responses contain this.
resp->mutable_master_instance()->CopyFrom(server_->instance_pb());
- if (!l.leader_status().ok()) {
- // For the time being, ignore heartbeats sent to non-leader distributed
- // masters.
- //
- // TODO KUDU-493 Allow all master processes to receive heartbeat
- // information: by having the TabletServers send heartbeats to all
- // masters, or by storing heartbeat information in a replicated
- // SysTable.
- LOG(WARNING) << "Received a heartbeat, but this Master instance is not a leader or a "
- << "single Master: " << l.leader_status().ToString();
- resp->set_leader_master(false);
- rpc->RespondSuccess();
- return;
- }
- resp->set_leader_master(true);
+ resp->set_leader_master(is_leader_master);
+ // 3. Register or look up the tserver.
shared_ptr<TSDescriptor> ts_desc;
- // If the TS is registering, register in the TS manager.
if (req->has_registration()) {
Status s = server_->ts_manager()->RegisterTS(req->common().ts_instance(),
req->registration(),
&ts_desc);
if (!s.ok()) {
- LOG(WARNING) << "Unable to register tablet server (" << rpc->requestor_string() << "): "
- << s.ToString();
+ LOG(WARNING) << Substitute("Unable to register tserver ($0): $1",
+ rpc->requestor_string(), s.ToString());
// TODO: add service-specific errors
rpc->RespondFailure(s);
return;
}
+ } else {
+ Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
+ if (s.IsNotFound()) {
+ LOG(INFO) << Substitute("Got heartbeat from unknown tserver ($0) as $1; "
+ "Asking this server to re-register.",
+ req->common().ts_instance().ShortDebugString(), rpc->requestor_string());
+ resp->set_needs_reregister(true);
+
+ // Don't bother asking for a full tablet report if we're a follower;
+ // it'll just get ignored anyway.
+ resp->set_needs_full_tablet_report(is_leader_master);
+
+ rpc->RespondSuccess();
+ return;
+ } else if (!s.ok()) {
+ LOG(WARNING) << Substitute("Unable to look up tserver for heartbeat "
+ "request $0 from $1: $2", req->DebugString(),
+ rpc->requestor_string(), s.ToString());
+ rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup tserver"));
+ return;
+ }
}
- // TODO: KUDU-86 if something fails after this point the TS will not be able
- // to register again.
-
- // Look up the TS -- if it just registered above, it will be found here.
- // This allows the TS to register and tablet-report in the same RPC.
- Status s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
- if (s.IsNotFound()) {
- LOG(INFO) << "Got heartbeat from unknown tablet server { "
- << req->common().ts_instance().ShortDebugString()
- << " } as " << rpc->requestor_string()
- << "; Asking this server to re-register.";
- resp->set_needs_reregister(true);
- resp->set_needs_full_tablet_report(true);
- rpc->RespondSuccess();
- return;
- } else if (!s.ok()) {
- LOG(WARNING) << "Unable to look up tablet server for heartbeat request "
- << req->DebugString() << " from " << rpc->requestor_string()
- << "\nStatus: " << s.ToString();
- rpc->RespondFailure(s.CloneAndPrepend("Unable to lookup TS"));
- return;
- }
-
+ // 4. Update tserver soft state based on the heartbeat contents.
ts_desc->UpdateHeartbeatTime();
ts_desc->set_num_live_replicas(req->num_live_tablets());
- if (req->has_tablet_report()) {
- s = server_->catalog_manager()->ProcessTabletReport(
- ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
+ // 5. Only leaders handle tablet reports.
+ if (is_leader_master && req->has_tablet_report()) {
+ Status s = server_->catalog_manager()->ProcessTabletReport(
+ ts_desc.get(), req->tablet_report(), resp->mutable_tablet_report(), rpc);
if (!s.ok()) {
rpc->RespondFailure(s.CloneAndPrepend("Failed to process tablet report"));
return;
}
}
- if (!ts_desc->has_tablet_report()) {
- resp->set_needs_full_tablet_report(true);
- }
-
rpc->RespondSuccess();
}
@@ -297,11 +282,6 @@ void MasterServiceImpl::GetTableSchema(const GetTableSchemaRequestPB* req,
void MasterServiceImpl::ListTabletServers(const ListTabletServersRequestPB* req,
ListTabletServersResponsePB* resp,
rpc::RpcContext* rpc) {
- CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
- if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
- return;
- }
-
vector<std::shared_ptr<TSDescriptor> > descs;
server_->ts_manager()->GetAllDescriptors(&descs);
for (const std::shared_ptr<TSDescriptor>& desc : descs) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_descriptor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.cc b/src/kudu/master/ts_descriptor.cc
index 80f500e..003faeb 100644
--- a/src/kudu/master/ts_descriptor.cc
+++ b/src/kudu/master/ts_descriptor.cc
@@ -28,6 +28,7 @@
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/util/net/net_util.h"
+using std::make_shared;
using std::shared_ptr;
namespace kudu {
@@ -35,8 +36,8 @@ namespace master {
Status TSDescriptor::RegisterNew(const NodeInstancePB& instance,
const TSRegistrationPB& registration,
- gscoped_ptr<TSDescriptor>* desc) {
- gscoped_ptr<TSDescriptor> ret(new TSDescriptor(instance.permanent_uuid()));
+ shared_ptr<TSDescriptor>* desc) {
+ shared_ptr<TSDescriptor> ret(make_shared<TSDescriptor>(instance.permanent_uuid()));
RETURN_NOT_OK(ret->Register(instance, registration));
desc->swap(ret);
return Status::OK();
@@ -46,7 +47,6 @@ TSDescriptor::TSDescriptor(std::string perm_id)
: permanent_uuid_(std::move(perm_id)),
latest_seqno_(-1),
last_heartbeat_(MonoTime::Now(MonoTime::FINE)),
- has_tablet_report_(false),
recent_replica_creations_(0),
last_replica_creations_decay_(MonoTime::Now(MonoTime::FINE)),
num_live_replicas_(0) {
@@ -87,9 +87,6 @@ Status TSDescriptor::Register(const NodeInstancePB& instance,
}
latest_seqno_ = instance.instance_seqno();
- // After re-registering, make the TS re-report its tablets.
- has_tablet_report_ = false;
-
registration_.reset(new TSRegistrationPB(registration));
ts_admin_proxy_.reset();
consensus_proxy_.reset();
@@ -113,16 +110,6 @@ int64_t TSDescriptor::latest_seqno() const {
return latest_seqno_;
}
-bool TSDescriptor::has_tablet_report() const {
- std::lock_guard<simple_spinlock> l(lock_);
- return has_tablet_report_;
-}
-
-void TSDescriptor::set_has_tablet_report(bool has_report) {
- std::lock_guard<simple_spinlock> l(lock_);
- has_tablet_report_ = has_report;
-}
-
void TSDescriptor::DecayRecentReplicaCreationsUnlocked() {
// In most cases, we won't have any recent replica creations, so
// we don't need to bother calling the clock, etc.
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_descriptor.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_descriptor.h b/src/kudu/master/ts_descriptor.h
index 3522d83..1a571f2 100644
--- a/src/kudu/master/ts_descriptor.h
+++ b/src/kudu/master/ts_descriptor.h
@@ -23,6 +23,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
@@ -55,7 +56,7 @@ class TSDescriptor {
public:
static Status RegisterNew(const NodeInstancePB& instance,
const TSRegistrationPB& registration,
- gscoped_ptr<TSDescriptor>* desc);
+ std::shared_ptr<TSDescriptor>* desc);
virtual ~TSDescriptor();
@@ -73,9 +74,6 @@ class TSDescriptor {
const std::string &permanent_uuid() const { return permanent_uuid_; }
int64_t latest_seqno() const;
- bool has_tablet_report() const;
- void set_has_tablet_report(bool has_report);
-
// Copy the current registration info into the given PB object.
// A safe copy is returned because the internal Registration object
// may be mutated at any point if the tablet server re-registers.
@@ -132,9 +130,6 @@ class TSDescriptor {
// The last time a heartbeat was received for this node.
MonoTime last_heartbeat_;
- // Set to true once this instance has reported all of its tablets.
- bool has_tablet_report_;
-
// The number of times this tablet server has recently been selected to create a
// tablet replica. This value decays back to 0 over time.
double recent_replica_creations_;
@@ -148,6 +143,7 @@ class TSDescriptor {
std::shared_ptr<tserver::TabletServerAdminServiceProxy> ts_admin_proxy_;
std::shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy_;
+ ALLOW_MAKE_SHARED(TSDescriptor);
DISALLOW_COPY_AND_ASSIGN(TSDescriptor);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/8618ae2d/src/kudu/master/ts_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/ts_manager.cc b/src/kudu/master/ts_manager.cc
index 59b2b53..f7601ac 100644
--- a/src/kudu/master/ts_manager.cc
+++ b/src/kudu/master/ts_manager.cc
@@ -21,6 +21,7 @@
#include <vector>
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/util/flag_tags.h"
@@ -34,6 +35,7 @@ TAG_FLAG(tserver_unresponsive_timeout_ms, advanced);
using std::shared_ptr;
using std::string;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace master {
@@ -75,16 +77,18 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
const string& uuid = instance.permanent_uuid();
if (!ContainsKey(servers_by_id_, uuid)) {
- gscoped_ptr<TSDescriptor> new_desc;
+ shared_ptr<TSDescriptor> new_desc;
RETURN_NOT_OK(TSDescriptor::RegisterNew(instance, registration, &new_desc));
- InsertOrDie(&servers_by_id_, uuid, shared_ptr<TSDescriptor>(new_desc.release()));
- LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString()
- << " } with Master";
+ InsertOrDie(&servers_by_id_, uuid, new_desc);
+ LOG(INFO) << Substitute("Registered new tserver $0 with Master",
+ instance.ShortDebugString());
+ desc->swap(new_desc);
} else {
- const shared_ptr<TSDescriptor>& found = FindOrDie(servers_by_id_, uuid);
+ shared_ptr<TSDescriptor> found(FindOrDie(servers_by_id_, uuid));
RETURN_NOT_OK(found->Register(instance, registration));
- LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString()
- << " } with Master";
+ LOG(INFO) << Substitute("Re-registered known tserver $0 with Master",
+ instance.ShortDebugString());
+ desc->swap(found);
}
return Status::OK();