You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/11/27 06:18:47 UTC
kudu git commit: KUDU-1097: 3-4-3 behavior for catalog manager
Repository: kudu
Updated Branches:
refs/heads/master 88e39bad1 -> 87dcaf34d
KUDU-1097: 3-4-3 behavior for catalog manager
Updated the catalog_manager to behave as specified in the 3-4-3
re-replication improvement v1 design implementation plan. This
update includes changes referred as 'patch 2' and 'patch 4'.
Change-Id: I6f0469ac641bf7a03dbef01eaa3f1b58a5bf5d27
Reviewed-on: http://gerrit.cloudera.org:8080/8619
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/87dcaf34
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/87dcaf34
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/87dcaf34
Branch: refs/heads/master
Commit: 87dcaf34d3495f87164b80e03b757efc12be2fe0
Parents: 88e39ba
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Nov 20 12:04:40 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Nov 27 06:17:45 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/quorum_util-test.cc | 442 ++++++++++++++++++-
src/kudu/consensus/quorum_util.cc | 187 ++++++++
src/kudu/consensus/quorum_util.h | 12 +
.../raft_consensus_nonvoter-itest.cc | 153 ++++++-
src/kudu/master/catalog_manager.cc | 343 +++++++++-----
src/kudu/master/catalog_manager.h | 2 +-
6 files changed, 1015 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util-test.cc b/src/kudu/consensus/quorum_util-test.cc
index 8864977..cede1db 100644
--- a/src/kudu/consensus/quorum_util-test.cc
+++ b/src/kudu/consensus/quorum_util-test.cc
@@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
#include <string>
+#include <utility>
+#include <vector>
+#include <boost/optional/optional.hpp>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
@@ -25,26 +29,68 @@
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
namespace kudu {
namespace consensus {
-using std::string;
+// Handy notation of membership types used by AddPeer(), etc.
+constexpr auto N = RaftPeerPB::NON_VOTER;
+constexpr auto U = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
+constexpr auto V = RaftPeerPB::VOTER;
+
+typedef std::pair<string, bool> Attr;
// Add a consensus peer into the specified configuration.
static void AddPeer(RaftConfigPB* config,
const string& uuid,
- RaftPeerPB::MemberType type) {
+ RaftPeerPB::MemberType type,
+ boost::optional<char> overall_health = boost::none,
+ vector<Attr> attrs = {}) {
RaftPeerPB* peer = config->add_peers();
peer->set_permanent_uuid(uuid);
peer->mutable_last_known_addr()->set_host(uuid + ".example.com");
peer->set_member_type(type);
+ if (overall_health) {
+ unique_ptr<HealthReportPB> health_report(new HealthReportPB);
+ switch (*overall_health) {
+ case '+':
+ health_report->set_overall_health(HealthReportPB::HEALTHY);
+ break;
+ case '-':
+ health_report->set_overall_health(HealthReportPB::FAILED);
+ break;
+ case '?':
+ health_report->set_overall_health(HealthReportPB::UNKNOWN);
+ break;
+ default:
+ FAIL() << *overall_health << ": unexpected replica health status";
+ break;
+ }
+ peer->set_allocated_health_report(health_report.release());
+ }
+ if (!attrs.empty()) {
+ unique_ptr<RaftPeerAttrsPB> attrs_pb(new RaftPeerAttrsPB);
+ for (const auto& attr : attrs) {
+ if (attr.first == "PROMOTE") {
+ attrs_pb->set_promote(attr.second);
+ } else if (attr.first == "REPLACE") {
+ attrs_pb->set_replace(attr.second);
+ } else {
+ FAIL() << attr.first << ": unexpected attribute to set";
+ }
+ }
+ peer->set_allocated_attrs(attrs_pb.release());
+ }
}
TEST(QuorumUtilTest, TestMemberExtraction) {
RaftConfigPB config;
- AddPeer(&config, "A", RaftPeerPB::VOTER);
- AddPeer(&config, "B", RaftPeerPB::VOTER);
- AddPeer(&config, "C", RaftPeerPB::VOTER);
+ AddPeer(&config, "A", V);
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", V);
// Basic test for GetRaftConfigMember().
RaftPeerPB* peer_pb;
@@ -65,9 +111,9 @@ TEST(QuorumUtilTest, TestMemberExtraction) {
TEST(QuorumUtilTest, TestDiffConsensusStates) {
ConsensusStatePB old_cs;
- AddPeer(old_cs.mutable_committed_config(), "A", RaftPeerPB::VOTER);
- AddPeer(old_cs.mutable_committed_config(), "B", RaftPeerPB::VOTER);
- AddPeer(old_cs.mutable_committed_config(), "C", RaftPeerPB::VOTER);
+ AddPeer(old_cs.mutable_committed_config(), "A", V);
+ AddPeer(old_cs.mutable_committed_config(), "B", V);
+ AddPeer(old_cs.mutable_committed_config(), "C", V);
old_cs.set_current_term(1);
old_cs.set_leader_uuid("A");
old_cs.mutable_committed_config()->set_opid_index(1);
@@ -102,7 +148,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
{
auto new_cs = old_cs;
new_cs.mutable_committed_config()->set_opid_index(2);
- AddPeer(new_cs.mutable_committed_config(), "D", RaftPeerPB::NON_VOTER);
+ AddPeer(new_cs.mutable_committed_config(), "D", N);
EXPECT_EQ("config changed from index 1 to 2, "
"NON_VOTER D (D.example.com) added",
@@ -114,7 +160,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
auto new_cs = old_cs;
new_cs.mutable_committed_config()->set_opid_index(2);
new_cs.mutable_committed_config()
- ->mutable_peers()->Mutable(2)->set_member_type(RaftPeerPB::NON_VOTER);
+ ->mutable_peers()->Mutable(2)->set_member_type(N);
EXPECT_EQ("config changed from index 1 to 2, "
"C (C.example.com) changed from VOTER to NON_VOTER",
@@ -150,10 +196,10 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
// Simulate a change in a pending config
{
auto before_cs = old_cs;
- AddPeer(before_cs.mutable_pending_config(), "A", RaftPeerPB::VOTER);
+ AddPeer(before_cs.mutable_pending_config(), "A", V);
auto after_cs = before_cs;
after_cs.mutable_pending_config()
- ->mutable_peers()->Mutable(0)->set_member_type(RaftPeerPB::NON_VOTER);
+ ->mutable_peers()->Mutable(0)->set_member_type(N);
EXPECT_EQ("pending config changed, A (A.example.com) changed from VOTER to NON_VOTER",
DiffConsensusStates(before_cs, after_cs));
@@ -162,9 +208,9 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
RaftConfigPB config;
- AddPeer(&config, "A", RaftPeerPB::VOTER);
- AddPeer(&config, "B", RaftPeerPB::NON_VOTER);
- AddPeer(&config, "C", RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+ AddPeer(&config, "A", V);
+ AddPeer(&config, "B", N);
+ AddPeer(&config, "C", U);
// The case when membership type is not specified. That sort of configuration
// would not pass VerifyRaftConfig(), though. Anyway, that should result
@@ -191,5 +237,371 @@ TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
ASSERT_FALSE(ReplicaTypesEqual(*peer_b, *peer_c));
}
+// Verify logic of the kudu::consensus::IsUnderReplicated.
+TEST(QuorumUtilTest, IsUnderReplicated) {
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V);
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", V);
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_FALSE(IsUnderReplicated(config, 3));
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", V, '?');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_FALSE(IsUnderReplicated(config, 3));
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", V, '-');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", N, '+');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", N, '+');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '-');
+ AddPeer(&config, "C", N, '+');
+ EXPECT_FALSE(IsUnderReplicated(config, 1));
+ EXPECT_TRUE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '-');
+ AddPeer(&config, "C", N, '+', {{"PROMOTE", true}});
+ EXPECT_FALSE(IsUnderReplicated(config, 1));
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '-');
+ AddPeer(&config, "C", N, '-', {{"PROMOTE", true}});
+ EXPECT_FALSE(IsUnderReplicated(config, 1));
+ EXPECT_TRUE(IsUnderReplicated(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+ AddPeer(&config, "D", N, '+');
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+ AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ EXPECT_FALSE(IsUnderReplicated(config, 3));
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", N, '-');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", N, '+');
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ // The replica does not have the PROMOTE attribute, so a new one is needed.
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_FALSE(IsUnderReplicated(config, 3));
+ EXPECT_TRUE(IsUnderReplicated(config, 4));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", N, '-', {{"PROMOTE", true}});
+ EXPECT_FALSE(IsUnderReplicated(config, 2));
+ EXPECT_TRUE(IsUnderReplicated(config, 3));
+ }
+}
+
+// Verify logic of the kudu::consensus::CanEvictReplica(), anticipating
+// removal of a voter replica.
+TEST(QuorumUtilTest, CanEvictReplicaVoters) {
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V);
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", V);
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", V, '-');
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '?');
+ AddPeer(&config, "B", V, '-');
+ AddPeer(&config, "C", V, '+');
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+', {{"REPLACE", true}});
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", V);
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+', {{"REPLACE", false}});
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", V, '+');
+ EXPECT_FALSE(CanEvictReplica(config, 4));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+ EXPECT_EQ("C", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", V, '+');
+ AddPeer(&config, "D", V, '+');
+ EXPECT_FALSE(CanEvictReplica(config, 4));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", V, '?', {{"REPLACE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("D", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", V, '-', {{"REPLACE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("D", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '-');
+ AddPeer(&config, "D", V, '+', {{"REPLACE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("D", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '?');
+ AddPeer(&config, "D", V, '+', {{"REPLACE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("D", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '?');
+ AddPeer(&config, "C", V, '+');
+ AddPeer(&config, "D", V, '+');
+ EXPECT_FALSE(CanEvictReplica(config, 4));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+}
+
+// Verify logic of the kudu::consensus::CanEvictReplica(), anticipating
+// removal of a non-voter replica.
+TEST(QuorumUtilTest, CanEvictReplicaNonVoters) {
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V);
+ EXPECT_FALSE(CanEvictReplica(config, 1));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ EXPECT_FALSE(CanEvictReplica(config, 1));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", N);
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", N, '+');
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("C", uuid_to_evict);
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("C", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", N, '-', {{"PROMOTE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", N, '-');
+ AddPeer(&config, "C", N);
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", N, '?');
+ AddPeer(&config, "C", N, '+');
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("B", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", N);
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ EXPECT_EQ("C", uuid_to_evict);
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '-');
+ AddPeer(&config, "B", V);
+ AddPeer(&config, "C", N);
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ EXPECT_FALSE(CanEvictReplica(config, 1));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '+');
+ AddPeer(&config, "B", V, '-');
+ AddPeer(&config, "C", N, '-', {{"PROMOTE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 2));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+ }
+ {
+ RaftConfigPB config;
+ AddPeer(&config, "A", V, '-');
+ AddPeer(&config, "B", V, '+');
+ AddPeer(&config, "C", V, '+');
+ AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+ EXPECT_FALSE(CanEvictReplica(config, 3));
+ string uuid_to_evict;
+ ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+ EXPECT_EQ("D", uuid_to_evict);
+ }
+}
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.cc b/src/kudu/consensus/quorum_util.cc
index 90d9671..e002070 100644
--- a/src/kudu/consensus/quorum_util.cc
+++ b/src/kudu/consensus/quorum_util.cc
@@ -17,6 +17,7 @@
#include "kudu/consensus/quorum_util.h"
#include <map>
+#include <ostream>
#include <set>
#include <string>
#include <utility>
@@ -35,6 +36,7 @@ using google::protobuf::RepeatedPtrField;
using kudu::pb_util::SecureShortDebugString;
using std::map;
using std::pair;
+using std::set;
using std::string;
using std::vector;
using strings::Substitute;
@@ -382,5 +384,190 @@ string DiffConsensusStates(const ConsensusStatePB& old_state,
return JoinStrings(change_strs, ", ");
}
+// The decision is based on:
+//
+// * the number of voter replicas in definitively bad shape and replicas
+// marked with the REPLACE attribute
+//
+// * the number of non-voter replicas marked with the PROMOTE=true attribute
+// in good or possibly good state.
+//
+// This is because a replica with UNKNOWN reported health state might actually
+// be in good shape. If so, then adding a new replica would lead to
+// over-provisioning. This logic assumes that a non-voter replica does not
+// stay in unknown state for eternity -- the leader replica should take care of
+// that and eventually update the health status either to 'HEALTHY' or 'FAILED'.
+//
+// TODO(aserbin): add a test scenario for the leader replica's logic to cover
+// the latter case.
+bool IsUnderReplicated(const RaftConfigPB& config, int replication_factor) {
+ int num_voters_total = 0;
+ int num_voters_need_replacement = 0;
+ int num_non_voters_to_promote = 0;
+
+ // While working with the optional fields related to per-replica health status
+ // and attributes, has_a_field()-like methods are not called because of
+ // the appropriate default values of those fields.
+ for (const RaftPeerPB& peer : config.peers()) {
+ switch (peer.member_type()) {
+ case RaftPeerPB::VOTER:
+ ++num_voters_total;
+ if (peer.attrs().replace() ||
+ peer.health_report().overall_health() == HealthReportPB::FAILED) {
+ ++num_voters_need_replacement;
+ }
+ break;
+ case RaftPeerPB::NON_VOTER:
+ if (peer.attrs().promote() &&
+ peer.health_report().overall_health() != HealthReportPB::FAILED) {
+ ++num_non_voters_to_promote;
+ }
+ break;
+ default:
+ LOG(DFATAL) << peer.member_type() << ": unsupported member type";
+ break;
+ }
+ }
+ return replication_factor > (num_voters_total - num_voters_need_replacement) +
+ num_non_voters_to_promote;
+}
+
+// Whether there is an excess replica to evict.
+bool CanEvictReplica(const RaftConfigPB& config,
+ int replication_factor,
+ string* uuid_to_evict) {
+ int num_non_voters_total = 0;
+
+ int num_voters_healthy = 0;
+ int num_voters_total = 0;
+
+ string non_voter_any;
+ string non_voter_failed;
+ string non_voter_unknown_health;
+
+ string voter_any;
+ string voter_failed;
+ string voter_replace;
+ string voter_unknown_health;
+
+ // While working with the optional fields related to per-replica health status
+ // and attributes, has_a_field()-like methods are not called because of
+ // the appropriate default values of those fields.
+ for (const RaftPeerPB& peer : config.peers()) {
+ DCHECK(peer.has_permanent_uuid() && !peer.permanent_uuid().empty());
+ const string& peer_uuid = peer.permanent_uuid();
+ const bool failed = peer.health_report().overall_health() == HealthReportPB::FAILED;
+ const bool healthy = peer.health_report().overall_health() == HealthReportPB::HEALTHY;
+ const bool unknown = !peer.has_health_report() ||
+ !peer.health_report().has_overall_health() ||
+ peer.health_report().overall_health() == HealthReportPB::UNKNOWN;
+ const bool has_replace = peer.attrs().replace();
+
+ switch (peer.member_type()) {
+ case RaftPeerPB::VOTER:
+ ++num_voters_total;
+ if (healthy && !has_replace) {
+ ++num_voters_healthy;
+ }
+ if (failed && has_replace) {
+ voter_failed = voter_replace = peer_uuid;
+ }
+ if (failed && voter_failed.empty()) {
+ voter_failed = peer_uuid;
+ }
+ if (has_replace && voter_replace.empty()) {
+ voter_replace = peer_uuid;
+ }
+ if (unknown && voter_unknown_health.empty()) {
+ voter_unknown_health = peer_uuid;
+ }
+ voter_any = peer_uuid;
+ break;
+
+ case RaftPeerPB::NON_VOTER:
+ ++num_non_voters_total;
+ if (failed && non_voter_failed.empty()) {
+ non_voter_failed = peer_uuid;
+ }
+ if (unknown && non_voter_unknown_health.empty()) {
+ non_voter_unknown_health = peer_uuid;
+ }
+ non_voter_any = peer_uuid;
+ break;
+
+ default:
+ LOG(DFATAL) << peer.member_type() << ": unsupported member type";
+ break;
+ }
+ }
+
+ // An conservative approach is used when evicting replicas.
+ //
+ // A non-voter replica may be evicted only if the number of voter replicas
+ // in good health without the REPLACE attribute is greater or equal to the
+ // specified replication factor. The idea is to avoid evicting non-voter
+ // replicas if there is uncertainty about the actual health status of the
+ // voters replicas of the tablet. This is because a present non-voter replica
+ // could to be a good fit to replace a voter replica which actual health
+ // status is not yet known.
+ //
+ // A voter replica may be evicted only if the number of voter replicas in good
+ // health without the REPLACE attribute is greater or equal to the specified
+ // replication factor. Removal of voter replicas from the tablet without exact
+ // knowledge of their health status could lead to removing the healthy ones
+ // and keeping the failed ones.
+ const bool can_evict_non_voter =
+ num_voters_healthy >= replication_factor && num_non_voters_total > 0;
+ const bool can_evict_voter =
+ num_voters_healthy > replication_factor ||
+ (num_voters_healthy == replication_factor &&
+ num_voters_total > replication_factor);
+
+ const bool can_evict = can_evict_non_voter || can_evict_voter;
+ string to_evict;
+ if (can_evict_non_voter) {
+ // First try to evict an excess non-voter, if present. This is to avoid
+ // possible IO load due to tablet copy in progress.
+ //
+ // Non-voter candidates for eviction (in decreasing priority):
+ // * failed
+ // * in unknown health state
+ // * any other
+ if (!non_voter_failed.empty()) {
+ to_evict = non_voter_failed;
+ } else if (!non_voter_unknown_health.empty()) {
+ to_evict = non_voter_unknown_health;
+ } else {
+ to_evict = non_voter_any;
+ }
+ } else if (can_evict_voter) {
+ // Next try to evict an excess voter.
+ //
+ // Voter candidates for eviction (in decreasing priority):
+ // * failed and having the attribute REPLACE set
+ // * having the attribute REPLACE set
+ // * failed
+ // * in unknown health state
+ // * any other
+ if (!voter_replace.empty()) {
+ to_evict = voter_replace;
+ } else if (!voter_failed.empty()) {
+ to_evict = voter_failed;
+ } else if (!voter_unknown_health.empty()) {
+ to_evict = voter_unknown_health;
+ } else {
+ to_evict = voter_any;
+ }
+ }
+
+ if (can_evict) {
+ DCHECK(!to_evict.empty());
+ if (uuid_to_evict) {
+ *uuid_to_evict = to_evict;
+ }
+ }
+ return can_evict;
+}
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index 3bd0afc..b0d12ae 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -88,6 +88,18 @@ std::string DiffConsensusStates(const ConsensusStatePB& old_state,
std::string DiffRaftConfigs(const RaftConfigPB& old_config,
const RaftConfigPB& new_config);
+// Return true iff the current cluster is under-replicated given the Raft
+// configuration and the included health status of the members.
+bool IsUnderReplicated(const RaftConfigPB& config, int replication_factor);
+
+// Check if the given Raft configuration contains at least one extra replica
+// which can be removed in accordance with the specified replication
+// factor. If so, then return 'true' and set the UUID of the best suited
+// replica into the 'uuid_to_evict' out parameter. Otherwise, return 'false'.
+bool CanEvictReplica(const RaftConfigPB& config,
+ int replication_factor,
+ std::string* uuid_to_evict = nullptr);
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index a190ebe..45dcc2b 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -57,6 +57,7 @@ METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalMaster;
using kudu::cluster::ExternalTabletServer;
using kudu::consensus::RaftPeerPB;
using kudu::itest::AddServer;
@@ -305,7 +306,7 @@ TEST_F(RaftConsensusNonVoterITest, AddNonVoterReplica) {
// The master should report about the newly added NON_VOTER tablet replica
// to the established leader.
bool has_leader;
- master::TabletLocationsPB tablet_locations;
+ TabletLocationsPB tablet_locations;
ASSERT_OK(WaitForReplicasReportedToMaster(
cluster_->master_proxy(), kOriginalReplicasNum + 1, tablet_id, kTimeout,
WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -429,7 +430,7 @@ TEST_F(RaftConsensusNonVoterITest, AddThenRemoveNonVoterReplica) {
// should report appropriate replica count at this point. The tablet leader
// should be established.
bool has_leader;
- master::TabletLocationsPB tablet_locations;
+ TabletLocationsPB tablet_locations;
ASSERT_OK(WaitForReplicasReportedToMaster(
cluster_->master_proxy(), kOriginalReplicasNum, tablet_id, kTimeout,
WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -745,7 +746,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
// The removed tablet replica should be gone, and the master should report
// appropriate replica count at this point.
bool has_leader;
- master::TabletLocationsPB tablet_locations;
+ TabletLocationsPB tablet_locations;
ASSERT_OK(WaitForReplicasReportedToMaster(
cluster_->master_proxy(), kInitialReplicasNum, tablet_id, kTimeout,
WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -893,5 +894,151 @@ TEST_F(RaftConsensusNonVoterITest, PromotedReplicaCanVote) {
ASSERT_EQ(new_replica_uuid, leader->uuid());
}
+// Add an extra non-voter replica to the tablet and make sure it's evicted
+// by the catalog manager once catalog manager sees its state updated.
+TEST_F(RaftConsensusNonVoterITest, CatalogManagerEvictsExcessNonVoter) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const int kReplicaUnavailableSec = 5;
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+ const int kReplicasNum = 3;
+ FLAGS_num_replicas = kReplicasNum;
+ // Need one extra tserver for a new non-voter replica.
+ FLAGS_num_tablet_servers = kReplicasNum + 1;
+ const vector<string> kMasterFlags = {
+ // The scenario runs with the new replica management scheme.
+ "--raft_prepare_replacement_before_eviction=true",
+ // Don't evict excess replicas to avoid races in the scenario.
+ "--catalog_manager_evict_excess_replicas=false",
+ };
+ const vector<string> kTserverFlags = {
+ // The scenario runs with the new replica management scheme.
+ "--raft_prepare_replacement_before_eviction=true",
+ Substitute("--consensus_rpc_timeout_ms=$0", 1000 * kReplicaUnavailableSec),
+ Substitute("--follower_unavailable_considered_failed_sec=$0",
+ kReplicaUnavailableSec),
+ };
+
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+ ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size());
+ ASSERT_EQ(kReplicasNum, tablet_replicas_.size());
+
+ TabletServerMap replica_servers;
+ for (const auto& e : tablet_replicas_) {
+ if (e.first == tablet_id_) {
+ replica_servers.emplace(e.second->uuid(), e.second);
+ }
+ }
+ ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+ TServerDetails* new_replica = nullptr;
+ for (const auto& ts : tablet_servers_) {
+ if (replica_servers.find(ts.first) == replica_servers.end()) {
+ new_replica = ts.second;
+ break;
+ }
+ }
+ ASSERT_NE(nullptr, new_replica);
+ ASSERT_OK(AddReplica(tablet_id_, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+ bool has_leader = false;
+ TabletLocationsPB tablet_locations;
+ // Make sure the extra replica is seen by the master.
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ kReplicasNum + 1,
+ tablet_id_,
+ kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader,
+ &tablet_locations));
+
+ // Switch the catalog manager to start evicting excess replicas
+ // (that's how it runs by default in the new replica management scheme).
+ // Prior to this point, that might lead to a race, since the newly added
+ // non-voter replica might be evicted before it's spotted by the
+ // WaitForReplicasReportedToMaster() call above.
+ for (auto i = 0; i < cluster_->num_masters(); ++i) {
+ ExternalMaster* m = cluster_->master(i);
+ ASSERT_OK(cluster_->SetFlag(m,
+ "catalog_manager_evict_excess_replicas", "true"));
+ }
+
+ ExternalTabletServer* new_replica_ts =
+ cluster_->tablet_server_by_uuid(new_replica->uuid());
+ ASSERT_NE(nullptr, new_replica_ts);
+ ASSERT_OK(new_replica_ts->Pause());
+ SCOPED_CLEANUP({
+ ASSERT_OK(new_replica_ts->Resume());
+ });
+ SleepFor(MonoDelta::FromSeconds(2 * kReplicaUnavailableSec));
+ ASSERT_OK(new_replica_ts->Resume());
+
+ // Make sure the excess non-voter replica is gone.
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ kReplicasNum,
+ tablet_id_,
+ kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader,
+ &tablet_locations));
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// Check that the catalog manager adds a non-voter replica to replace failed
+// voter replica in a tablet.
+//
+// TODO(aserbin): and make it run for 5 tablet servers.
+TEST_F(RaftConsensusNonVoterITest, CatalogManagerAddsNonVoter) {
+ if (!AllowSlowTests()) {
+ LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+ return;
+ }
+
+ const int kReplicaUnavailableSec = 10;
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(6 * kReplicaUnavailableSec);
+ const int kReplicasNum = 3;
+ FLAGS_num_replicas = kReplicasNum;
+ // Need one extra tserver after the tserver with on of the replicas stopped.
+ // Otherwise, the catalog manager would not be able to spawn a new non-voter
+ // replacement replica.
+ FLAGS_num_tablet_servers = kReplicasNum + 1;
+ const vector<string> kMasterFlags = {
+ // The scenario runs with the new replica management scheme.
+ "--raft_prepare_replacement_before_eviction=true",
+ // Don't evict excess replicas to avoid races in the scenario.
+ "--catalog_manager_evict_excess_replicas=false",
+ };
+ const vector<string> kTserverFlags = {
+ // The scenario runs with the new replica management scheme.
+ "--raft_prepare_replacement_before_eviction=true",
+ Substitute("--follower_unavailable_considered_failed_sec=$0",
+ kReplicaUnavailableSec),
+ };
+
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+ ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size());
+ ASSERT_EQ(kReplicasNum, tablet_replicas_.size());
+
+ ExternalTabletServer* ts0 = cluster_->tablet_server(0);
+ ASSERT_NE(nullptr, ts0);
+ ts0->Shutdown();
+
+ // Wait for a new non-voter replica added by the catalog manager to
+ // replace the failed one.
+ bool has_leader = false;
+ TabletLocationsPB tablet_locations;
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ kReplicasNum + 1,
+ tablet_id_,
+ kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader,
+ &tablet_locations));
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
+
} // namespace tserver
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6702750..5131bd3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -60,6 +60,7 @@
#include <boost/function.hpp>
#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/cfile/type_encodings.h"
@@ -225,6 +226,41 @@ DEFINE_int32(catalog_manager_inject_latency_prior_tsk_write_ms, 0,
TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, hidden);
TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, unsafe);
+DEFINE_bool(catalog_manager_evict_excess_replicas, true,
+ "Whether catalog manager evicts excess replicas from tablet "
+ "configuration based on replication factor.");
+TAG_FLAG(catalog_manager_evict_excess_replicas, hidden);
+TAG_FLAG(catalog_manager_evict_excess_replicas, runtime);
+
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
+using base::subtle::NoBarrier_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
+using kudu::cfile::TypeEncodingInfo;
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::GetConsensusRole;
+using kudu::consensus::IsRaftConfigMember;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::consensus::StartTabletCopyRequestPB;
+using kudu::consensus::kMinimumTerm;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcContext;
+using kudu::security::Cert;
+using kudu::security::DataFormat;
+using kudu::security::PrivateKey;
+using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
+using kudu::security::TokenSigningPrivateKeyPB;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletStatePB;
+using kudu::tserver::TabletServerErrorPB;
using std::pair;
using std::set;
using std::shared_ptr;
@@ -233,38 +269,11 @@ using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
+using strings::Substitute;
namespace kudu {
namespace master {
-using base::subtle::NoBarrier_CompareAndSwap;
-using base::subtle::NoBarrier_Load;
-using cfile::TypeEncodingInfo;
-using consensus::ConsensusServiceProxy;
-using consensus::ConsensusStatePB;
-using consensus::GetConsensusRole;
-using consensus::IsRaftConfigMember;
-using consensus::RaftConsensus;
-using consensus::RaftPeerPB;
-using consensus::StartTabletCopyRequestPB;
-using consensus::kMinimumTerm;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
-using rpc::RpcContext;
-using security::Cert;
-using security::DataFormat;
-using security::PrivateKey;
-using security::TokenSigner;
-using security::TokenSigningPrivateKey;
-using security::TokenSigningPrivateKeyPB;
-using strings::Substitute;
-using tablet::TABLET_DATA_DELETED;
-using tablet::TABLET_DATA_TOMBSTONED;
-using tablet::TabletDataState;
-using tablet::TabletReplica;
-using tablet::TabletStatePB;
-using tserver::TabletServerErrorPB;
-
////////////////////////////////////////////////////////////
// Table Loader
////////////////////////////////////////////////////////////
@@ -2983,67 +2992,139 @@ class AsyncAlterTable : public RetryingTSRpcTask {
tserver::AlterSchemaResponsePB resp_;
};
-class AsyncAddServerTask : public RetryingTSRpcTask {
+class AsyncChangeConfigTask : public RetryingTSRpcTask {
public:
- AsyncAddServerTask(Master *master,
- const scoped_refptr<TabletInfo>& tablet,
- ConsensusStatePB cstate,
- ThreadSafeRandom* rng)
- : RetryingTSRpcTask(master,
- gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
- tablet->table()),
- tablet_(tablet),
- cstate_(std::move(cstate)),
- rng_(rng) {
- deadline_ = MonoTime::Max(); // Never time out.
- }
-
- string type_name() const override { return "AddServer ChangeConfig"; }
+ AsyncChangeConfigTask(Master* master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ consensus::ChangeConfigType change_config_type);
string description() const override;
protected:
- bool SendRequest(int attempt) override;
void HandleResponse(int attempt) override;
-
- private:
- string tablet_id() const override { return tablet_->id(); }
+ bool CheckOpIdIndex();
const scoped_refptr<TabletInfo> tablet_;
const ConsensusStatePB cstate_;
+ const consensus::ChangeConfigType change_config_type_;
- // Used to make random choices in replica selection.
- ThreadSafeRandom* rng_;
-
- consensus::ChangeConfigRequestPB req_;
consensus::ChangeConfigResponsePB resp_;
+
+ private:
+ string tablet_id() const override { return tablet_->id(); }
};
-string AsyncAddServerTask::description() const {
- return Substitute("AddServer ChangeConfig RPC for tablet $0 "
- "with cas_config_opid_index $1",
+AsyncChangeConfigTask::AsyncChangeConfigTask(Master* master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ consensus::ChangeConfigType change_config_type)
+ : RetryingTSRpcTask(master,
+ gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
+ tablet->table()),
+ tablet_(std::move(tablet)),
+ cstate_(std::move(cstate)),
+ change_config_type_(change_config_type) {
+ deadline_ = MonoTime::Max(); // Never time out.
+ }
+
+string AsyncChangeConfigTask::description() const {
+ return Substitute("$0 RPC for tablet $1 with cas_config_opid_index $2",
+ type_name(),
tablet_->id(),
cstate_.committed_config().opid_index());
}
-bool AsyncAddServerTask::SendRequest(int attempt) {
- LOG(INFO) << Substitute("Sending request for AddServer on tablet $0 (attempt $1)",
- tablet_->id(), attempt);
+void AsyncChangeConfigTask::HandleResponse(int attempt) {
+ if (!resp_.has_error()) {
+ MarkComplete();
+ LOG_WITH_PREFIX(INFO) << Substitute("$0 succeeded (attempt $1)",
+ type_name(), attempt);
+ return;
+ }
- // Bail if we're retrying in vain.
+ Status status = StatusFromPB(resp_.error().status());
+
+ // Do not retry on a CAS error, otherwise retry forever or until cancelled.
+ switch (resp_.error().code()) {
+ case TabletServerErrorPB::CAS_FAILED:
+ LOG_WITH_PREFIX(WARNING) << Substitute("$0 failed with leader $1 "
+ "due to CAS failure; no further retry: $2",
+ type_name(), target_ts_desc_->ToString(),
+ status.ToString());
+ MarkFailed();
+ break;
+ default:
+ LOG_WITH_PREFIX(INFO) << Substitute("$0 failed with leader $1 "
+ "due to error $2; will retry: $3",
+ type_name(), target_ts_desc_->ToString(),
+ TabletServerErrorPB::Code_Name(resp_.error().code()), status.ToString());
+ break;
+ }
+}
+
+bool AsyncChangeConfigTask::CheckOpIdIndex() {
int64_t latest_index;
{
TabletMetadataLock tablet_lock(tablet_.get(), LockMode::READ);
latest_index = tablet_lock.data().pb.consensus_state()
- .committed_config().opid_index();
+ .committed_config().opid_index();
}
if (latest_index > cstate_.committed_config().opid_index()) {
- LOG_WITH_PREFIX(INFO) << Substitute("Latest config for has opid_index of $0 "
- "while this task has opid_index of $1. Aborting task",
+ LOG_WITH_PREFIX(INFO) << Substitute("aborting the task: "
+ "latest config opid_index $0; task opid_index $1",
latest_index, cstate_.committed_config().opid_index());
MarkAborted();
return false;
}
+ return true;
+}
+
+class AsyncAddReplicaTask : public AsyncChangeConfigTask {
+ public:
+ AsyncAddReplicaTask(Master* master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ RaftPeerPB::MemberType member_type,
+ ThreadSafeRandom* rng);
+
+ string type_name() const override;
+
+ protected:
+ bool SendRequest(int attempt) override;
+
+ private:
+ const RaftPeerPB::MemberType member_type_;
+
+ // Used to make random choices in replica selection.
+ ThreadSafeRandom* rng_;
+};
+
+AsyncAddReplicaTask::AsyncAddReplicaTask(Master* master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ RaftPeerPB::MemberType member_type,
+ ThreadSafeRandom* rng)
+ : AsyncChangeConfigTask(master, std::move(tablet), std::move(cstate),
+ consensus::ADD_SERVER),
+ member_type_(member_type),
+ rng_(rng) {
+}
+
+string AsyncAddReplicaTask::type_name() const {
+ return Substitute("ChangeConfig:$0:$1",
+ consensus::ChangeConfigType_Name(change_config_type_),
+ RaftPeerPB::MemberType_Name(member_type_));
+}
+
+bool AsyncAddReplicaTask::SendRequest(int attempt) {
+ // Bail if we're retrying in vain.
+ if (!CheckOpIdIndex()) {
+ return false;
+ }
+
+ LOG(INFO) << Substitute("Sending $0 on tablet $1 (attempt $2)",
+ type_name(), tablet_->id(), attempt);
// Select the replica we wish to add to the config.
// Do not include current members of the config.
@@ -3063,48 +3144,76 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
return false;
}
- req_.set_dest_uuid(target_ts_desc_->permanent_uuid());
- req_.set_tablet_id(tablet_->id());
- req_.set_type(consensus::ADD_SERVER);
- req_.set_cas_config_opid_index(cstate_.committed_config().opid_index());
- RaftPeerPB* peer = req_.mutable_server();
+ consensus::ChangeConfigRequestPB req;
+ req.set_dest_uuid(target_ts_desc_->permanent_uuid());
+ req.set_tablet_id(tablet_->id());
+ req.set_type(consensus::ADD_SERVER);
+ req.set_cas_config_opid_index(cstate_.committed_config().opid_index());
+ RaftPeerPB* peer = req.mutable_server();
peer->set_permanent_uuid(replacement_replica->permanent_uuid());
ServerRegistrationPB peer_reg;
replacement_replica->GetRegistration(&peer_reg);
CHECK_GT(peer_reg.rpc_addresses_size(), 0);
*peer->mutable_last_known_addr() = peer_reg.rpc_addresses(0);
- peer->set_member_type(RaftPeerPB::VOTER);
- VLOG(1) << Substitute("Sending AddServer ChangeConfig request to $0: $1",
- target_ts_desc_->ToString(), SecureDebugString(req_));
- consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_,
- boost::bind(&AsyncAddServerTask::RpcCallback, this));
+ peer->set_member_type(member_type_);
+ VLOG(1) << Substitute("Sending $0 request to $1: $2",
+ type_name(), target_ts_desc_->ToString(), SecureDebugString(req));
+ consensus_proxy_->ChangeConfigAsync(req, &resp_, &rpc_,
+ boost::bind(&AsyncAddReplicaTask::RpcCallback, this));
return true;
}
-void AsyncAddServerTask::HandleResponse(int attempt) {
- if (!resp_.has_error()) {
- MarkComplete();
- LOG_WITH_PREFIX(INFO) << "Config change to add server succeeded";
- return;
- }
+class AsyncEvictReplicaTask : public AsyncChangeConfigTask {
+ public:
+ AsyncEvictReplicaTask(Master *master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ string peer_uuid_to_evict);
- Status status = StatusFromPB(resp_.error().status());
+ string type_name() const override;
- // Do not retry on a CAS error, otherwise retry forever or until cancelled.
- switch (resp_.error().code()) {
- case TabletServerErrorPB::CAS_FAILED:
- LOG_WITH_PREFIX(WARNING) << Substitute("ChangeConfig() failed with leader $0 "
- "due to CAS failure. No further retry: $1",
- target_ts_desc_->ToString(), status.ToString());
- MarkFailed();
- break;
- default:
- LOG_WITH_PREFIX(INFO) << Substitute("ChangeConfig() failed with leader $0 "
- "due to error $1. This operation will be retried. Error detail: $2",
- target_ts_desc_->ToString(),
- TabletServerErrorPB::Code_Name(resp_.error().code()), status.ToString());
- break;
+ protected:
+ bool SendRequest(int attempt) override;
+
+ private:
+ const string peer_uuid_to_evict_;
+};
+
+AsyncEvictReplicaTask::AsyncEvictReplicaTask(Master* master,
+ scoped_refptr<TabletInfo> tablet,
+ ConsensusStatePB cstate,
+ string peer_uuid_to_evict)
+ : AsyncChangeConfigTask(master, std::move(tablet), std::move(cstate),
+ consensus::REMOVE_SERVER),
+ peer_uuid_to_evict_(std::move(peer_uuid_to_evict)) {
+}
+
+string AsyncEvictReplicaTask::type_name() const {
+ return Substitute("ChangeConfig:$0",
+ consensus::ChangeConfigType_Name(change_config_type_));
+}
+
+bool AsyncEvictReplicaTask::SendRequest(int attempt) {
+ // Bail if we're retrying in vain.
+ if (!CheckOpIdIndex()) {
+ return false;
}
+
+ LOG(INFO) << Substitute("Sending $0 on tablet $1 (attempt $2)",
+ type_name(), tablet_->id(), attempt);
+
+ consensus::ChangeConfigRequestPB req;
+ req.set_dest_uuid(target_ts_desc_->permanent_uuid());
+ req.set_tablet_id(tablet_->id());
+ req.set_type(consensus::REMOVE_SERVER);
+ req.set_cas_config_opid_index(cstate_.committed_config().opid_index());
+ RaftPeerPB* peer = req.mutable_server();
+ peer->set_permanent_uuid(peer_uuid_to_evict_);
+ VLOG(1) << Substitute("Sending $0 request to $1: $2",
+ type_name(), target_ts_desc_->ToString(), SecureDebugString(req));
+ consensus_proxy_->ChangeConfigAsync(req, &resp_, &rpc_,
+ boost::bind(&AsyncEvictReplicaTask::RpcCallback, this));
+ return true;
}
Status CatalogManager::ProcessTabletReport(
@@ -3265,6 +3374,8 @@ Status CatalogManager::ProcessTabletReport(
continue;
}
+ const auto replication_factor = table->metadata().state().pb.num_replicas();
+ bool consensus_state_updated = false;
// 7. Process the report's consensus state. There may be one even when the
// replica has been tombstoned.
if (report.has_consensus_state()) {
@@ -3306,11 +3417,12 @@ Status CatalogManager::ProcessTabletReport(
// the committed config's opid_index).
// - The new cstate has a leader, and either the old cstate didn't, or
// there was a term change.
- if (cstate.committed_config().opid_index() > prev_cstate.committed_config().opid_index() ||
+ consensus_state_updated = (cstate.committed_config().opid_index() >
+ prev_cstate.committed_config().opid_index()) ||
(!cstate.leader_uuid().empty() &&
(prev_cstate.leader_uuid().empty() ||
- cstate.current_term() > prev_cstate.current_term()))) {
-
+ cstate.current_term() > prev_cstate.current_term()));
+ if (consensus_state_updated) {
// 7d(i). Retain knowledge of the leader even if it wasn't reported in
// the latest config.
//
@@ -3326,13 +3438,12 @@ Status CatalogManager::ProcessTabletReport(
} else if (!cstate.leader_uuid().empty() &&
!prev_cstate.leader_uuid().empty() &&
cstate.leader_uuid() != prev_cstate.leader_uuid()) {
- string msg = Substitute("Previously reported cstate for tablet $0 gave "
+ LOG(DFATAL) << Substitute("Previously reported cstate for tablet $0 gave "
"a different leader for term $1 than the current cstate. "
"Previous cstate: $2. Current cstate: $3.",
tablet->ToString(), cstate.current_term(),
SecureShortDebugString(prev_cstate),
SecureShortDebugString(cstate));
- LOG(DFATAL) << msg;
continue;
}
}
@@ -3368,15 +3479,37 @@ Status CatalogManager::ProcessTabletReport(
}
}
}
+ }
- // 7d(iv). Add a server to the config if it is under-replicated.
+ // 7e. Make tablet configuration change depending on the mode the server
+ // is running with. The choice between two alternative modes is controlled
+ // by the 'raft_prepare_replacement_before_eviction' run-time flag.
+ if (FLAGS_raft_prepare_replacement_before_eviction) {
+ // An alternative scheme of managing tablet replicas: the catalog
+ // manager processes the health-related info on replicas from the tablet
+ // report and initiates appropriate modifications for the tablet Raft
+ // configuration: evict an already-replaced failed voter replica or add
+ // a new non-voter replica marked for promotion as a replacement.
+ const RaftConfigPB& config = cstate.committed_config();
+ string to_evict;
+ if (IsUnderReplicated(config, replication_factor)) {
+ rpcs.emplace_back(new AsyncAddReplicaTask(
+ master_, tablet, cstate, RaftPeerPB::NON_VOTER, &rng_));
+ } else if (PREDICT_TRUE(FLAGS_catalog_manager_evict_excess_replicas) &&
+ CanEvictReplica(config, replication_factor, &to_evict)) {
+ DCHECK(!to_evict.empty());
+ rpcs.emplace_back(new AsyncEvictReplicaTask(
+ master_, tablet, cstate, std::move(to_evict)));
+ }
+ } else if (consensus_state_updated &&
+ FLAGS_master_add_server_when_underreplicated &&
+ CountVoters(cstate.committed_config()) < replication_factor) {
+ // Add a server to the config if it is under-replicated.
//
// This is an idempotent operation due to a CAS enforced on the
// committed config's opid_index.
- if (FLAGS_master_add_server_when_underreplicated &&
- CountVoters(cstate.committed_config()) < table->metadata().state().pb.num_replicas()) {
- rpcs.emplace_back(new AsyncAddServerTask(master_, tablet, cstate, &rng_));
- }
+ rpcs.emplace_back(new AsyncAddReplicaTask(
+ master_, tablet, cstate, RaftPeerPB::VOTER, &rng_));
}
}
@@ -3820,7 +3953,7 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc
ConsensusStatePB* cstate = tablet->mutable_metadata()->mutable_dirty()
->pb.mutable_consensus_state();
cstate->set_current_term(kMinimumTerm);
- consensus::RaftConfigPB *config = cstate->mutable_committed_config();
+ RaftConfigPB *config = cstate->mutable_committed_config();
// Maintain ability to downgrade Kudu to a version with LocalConsensus.
if (nreplicas == 1) {
@@ -3836,7 +3969,7 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc
void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& tablet,
const TabletMetadataLock& tablet_lock) {
- const consensus::RaftConfigPB& config =
+ const RaftConfigPB& config =
tablet_lock.data().pb.consensus_state().committed_config();
tablet->set_last_create_tablet_time(MonoTime::Now());
for (const RaftPeerPB& peer : config.peers()) {
@@ -3850,7 +3983,7 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
int nreplicas,
- consensus::RaftConfigPB *config) {
+ RaftConfigPB *config) {
DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: " << SecureShortDebugString(*config);
DCHECK_LE(nreplicas, ts_descs.size());
http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index b25a655..5f406ad 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -797,7 +797,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
uint32_t version);
// Send the "create tablet request" to all peers of a particular tablet.
- //.
+ //
// The creation is async, and at the moment there is no error checking on the
// caller side. We rely on the assignment timeout. If we don't see the tablet
// after the timeout, we regenerate a new one and proceed with a new