You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/11/27 06:18:47 UTC

kudu git commit: KUDU-1097: 3-4-3 behavior for catalog manager

Repository: kudu
Updated Branches:
  refs/heads/master 88e39bad1 -> 87dcaf34d


KUDU-1097: 3-4-3 behavior for catalog manager

Updated the catalog_manager to behave as specified in the 3-4-3
re-replication improvement v1 design implementation plan.  This
update includes changes referred as 'patch 2' and 'patch 4'.

Change-Id: I6f0469ac641bf7a03dbef01eaa3f1b58a5bf5d27
Reviewed-on: http://gerrit.cloudera.org:8080/8619
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/87dcaf34
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/87dcaf34
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/87dcaf34

Branch: refs/heads/master
Commit: 87dcaf34d3495f87164b80e03b757efc12be2fe0
Parents: 88e39ba
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Nov 20 12:04:40 2017 -0800
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Mon Nov 27 06:17:45 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/quorum_util-test.cc          | 442 ++++++++++++++++++-
 src/kudu/consensus/quorum_util.cc               | 187 ++++++++
 src/kudu/consensus/quorum_util.h                |  12 +
 .../raft_consensus_nonvoter-itest.cc            | 153 ++++++-
 src/kudu/master/catalog_manager.cc              | 343 +++++++++-----
 src/kudu/master/catalog_manager.h               |   2 +-
 6 files changed, 1015 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util-test.cc b/src/kudu/consensus/quorum_util-test.cc
index 8864977..cede1db 100644
--- a/src/kudu/consensus/quorum_util-test.cc
+++ b/src/kudu/consensus/quorum_util-test.cc
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
 #include <string>
+#include <utility>
+#include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
@@ -25,26 +29,68 @@
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
 namespace kudu {
 namespace consensus {
 
-using std::string;
+// Handy notation of membership types used by AddPeer(), etc.
+constexpr auto N = RaftPeerPB::NON_VOTER;
+constexpr auto U = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
+constexpr auto V = RaftPeerPB::VOTER;
+
+typedef std::pair<string, bool> Attr;
 
 // Add a consensus peer into the specified configuration.
 static void AddPeer(RaftConfigPB* config,
                     const string& uuid,
-                    RaftPeerPB::MemberType type) {
+                    RaftPeerPB::MemberType type,
+                    boost::optional<char> overall_health = boost::none,
+                    vector<Attr> attrs = {}) {
   RaftPeerPB* peer = config->add_peers();
   peer->set_permanent_uuid(uuid);
   peer->mutable_last_known_addr()->set_host(uuid + ".example.com");
   peer->set_member_type(type);
+  if (overall_health) {
+    unique_ptr<HealthReportPB> health_report(new HealthReportPB);
+    switch (*overall_health) {
+      case '+':
+        health_report->set_overall_health(HealthReportPB::HEALTHY);
+        break;
+      case '-':
+        health_report->set_overall_health(HealthReportPB::FAILED);
+        break;
+      case '?':
+        health_report->set_overall_health(HealthReportPB::UNKNOWN);
+        break;
+      default:
+        FAIL() << *overall_health << ": unexpected replica health status";
+        break;
+    }
+    peer->set_allocated_health_report(health_report.release());
+  }
+  if (!attrs.empty()) {
+    unique_ptr<RaftPeerAttrsPB> attrs_pb(new RaftPeerAttrsPB);
+    for (const auto& attr : attrs) {
+      if (attr.first == "PROMOTE") {
+        attrs_pb->set_promote(attr.second);
+      } else if (attr.first == "REPLACE") {
+        attrs_pb->set_replace(attr.second);
+      } else {
+        FAIL() << attr.first << ": unexpected attribute to set";
+      }
+    }
+    peer->set_allocated_attrs(attrs_pb.release());
+  }
 }
 
 TEST(QuorumUtilTest, TestMemberExtraction) {
   RaftConfigPB config;
-  AddPeer(&config, "A", RaftPeerPB::VOTER);
-  AddPeer(&config, "B", RaftPeerPB::VOTER);
-  AddPeer(&config, "C", RaftPeerPB::VOTER);
+  AddPeer(&config, "A", V);
+  AddPeer(&config, "B", V);
+  AddPeer(&config, "C", V);
 
   // Basic test for GetRaftConfigMember().
   RaftPeerPB* peer_pb;
@@ -65,9 +111,9 @@ TEST(QuorumUtilTest, TestMemberExtraction) {
 
 TEST(QuorumUtilTest, TestDiffConsensusStates) {
   ConsensusStatePB old_cs;
-  AddPeer(old_cs.mutable_committed_config(), "A", RaftPeerPB::VOTER);
-  AddPeer(old_cs.mutable_committed_config(), "B", RaftPeerPB::VOTER);
-  AddPeer(old_cs.mutable_committed_config(), "C", RaftPeerPB::VOTER);
+  AddPeer(old_cs.mutable_committed_config(), "A", V);
+  AddPeer(old_cs.mutable_committed_config(), "B", V);
+  AddPeer(old_cs.mutable_committed_config(), "C", V);
   old_cs.set_current_term(1);
   old_cs.set_leader_uuid("A");
   old_cs.mutable_committed_config()->set_opid_index(1);
@@ -102,7 +148,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
   {
     auto new_cs = old_cs;
     new_cs.mutable_committed_config()->set_opid_index(2);
-    AddPeer(new_cs.mutable_committed_config(), "D", RaftPeerPB::NON_VOTER);
+    AddPeer(new_cs.mutable_committed_config(), "D", N);
 
     EXPECT_EQ("config changed from index 1 to 2, "
               "NON_VOTER D (D.example.com) added",
@@ -114,7 +160,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
     auto new_cs = old_cs;
     new_cs.mutable_committed_config()->set_opid_index(2);
     new_cs.mutable_committed_config()
-      ->mutable_peers()->Mutable(2)->set_member_type(RaftPeerPB::NON_VOTER);
+      ->mutable_peers()->Mutable(2)->set_member_type(N);
 
     EXPECT_EQ("config changed from index 1 to 2, "
               "C (C.example.com) changed from VOTER to NON_VOTER",
@@ -150,10 +196,10 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
   // Simulate a change in a pending config
   {
     auto before_cs = old_cs;
-    AddPeer(before_cs.mutable_pending_config(), "A", RaftPeerPB::VOTER);
+    AddPeer(before_cs.mutable_pending_config(), "A", V);
     auto after_cs = before_cs;
     after_cs.mutable_pending_config()
-      ->mutable_peers()->Mutable(0)->set_member_type(RaftPeerPB::NON_VOTER);
+      ->mutable_peers()->Mutable(0)->set_member_type(N);
 
     EXPECT_EQ("pending config changed, A (A.example.com) changed from VOTER to NON_VOTER",
               DiffConsensusStates(before_cs, after_cs));
@@ -162,9 +208,9 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
 
 TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
   RaftConfigPB config;
-  AddPeer(&config, "A", RaftPeerPB::VOTER);
-  AddPeer(&config, "B", RaftPeerPB::NON_VOTER);
-  AddPeer(&config, "C", RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+  AddPeer(&config, "A", V);
+  AddPeer(&config, "B", N);
+  AddPeer(&config, "C", U);
 
   // The case when membership type is not specified. That sort of configuration
   // would not pass VerifyRaftConfig(), though. Anyway, that should result
@@ -191,5 +237,371 @@ TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
   ASSERT_FALSE(ReplicaTypesEqual(*peer_b, *peer_c));
 }
 
+// Verify logic of the kudu::consensus::IsUnderReplicated.
+TEST(QuorumUtilTest, IsUnderReplicated) {
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V);
+    AddPeer(&config, "B", V);
+    AddPeer(&config, "C", V);
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_FALSE(IsUnderReplicated(config, 3));
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", V, '?');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_FALSE(IsUnderReplicated(config, 3));
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", V, '-');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", N, '+');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", N, '+');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", N, '+');
+    EXPECT_FALSE(IsUnderReplicated(config, 1));
+    EXPECT_TRUE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", N, '+', {{"PROMOTE", true}});
+    EXPECT_FALSE(IsUnderReplicated(config, 1));
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", N, '-', {{"PROMOTE", true}});
+    EXPECT_FALSE(IsUnderReplicated(config, 1));
+    EXPECT_TRUE(IsUnderReplicated(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+    AddPeer(&config, "D", N, '+');
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+', {{"REPLACE", true}});
+    AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+    EXPECT_FALSE(IsUnderReplicated(config, 3));
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", N, '-');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", N, '+');
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    // The replica does not have the PROMOTE attribute, so a new one is needed.
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_FALSE(IsUnderReplicated(config, 3));
+    EXPECT_TRUE(IsUnderReplicated(config, 4));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", N, '-', {{"PROMOTE", true}});
+    EXPECT_FALSE(IsUnderReplicated(config, 2));
+    EXPECT_TRUE(IsUnderReplicated(config, 3));
+  }
+}
+
+// Verify logic of the kudu::consensus::CanEvictReplica(), anticipating
+// removal of a voter replica.
+TEST(QuorumUtilTest, CanEvictReplicaVoters) {
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V);
+    AddPeer(&config, "B", V);
+    AddPeer(&config, "C", V);
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", V, '-');
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '?');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", V, '+');
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+', {{"REPLACE", true}});
+    AddPeer(&config, "B", V);
+    AddPeer(&config, "C", V);
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+', {{"REPLACE", false}});
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", V, '+');
+    EXPECT_FALSE(CanEvictReplica(config, 4));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+    EXPECT_EQ("C", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", V, '+');
+    AddPeer(&config, "D", V, '+');
+    EXPECT_FALSE(CanEvictReplica(config, 4));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", V, '?', {{"REPLACE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("D", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", V, '-', {{"REPLACE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("D", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '-');
+    AddPeer(&config, "D", V, '+', {{"REPLACE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("D", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '?');
+    AddPeer(&config, "D", V, '+', {{"REPLACE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("D", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '?');
+    AddPeer(&config, "C", V, '+');
+    AddPeer(&config, "D", V, '+');
+    EXPECT_FALSE(CanEvictReplica(config, 4));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 3, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+}
+
+// Verify logic of the kudu::consensus::CanEvictReplica(), anticipating
+// removal of a non-voter replica.
+TEST(QuorumUtilTest, CanEvictReplicaNonVoters) {
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V);
+    EXPECT_FALSE(CanEvictReplica(config, 1));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    EXPECT_FALSE(CanEvictReplica(config, 1));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", N);
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", N, '+');
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("C", uuid_to_evict);
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("C", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", N, '-', {{"PROMOTE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", N, '-');
+    AddPeer(&config, "C", N);
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", N, '?');
+    AddPeer(&config, "C", N, '+');
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("B", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V);
+    AddPeer(&config, "C", N);
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+    EXPECT_EQ("C", uuid_to_evict);
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V);
+    AddPeer(&config, "C", N);
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    EXPECT_FALSE(CanEvictReplica(config, 1));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '+');
+    AddPeer(&config, "B", V, '-');
+    AddPeer(&config, "C", N, '-', {{"PROMOTE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 2));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 1, &uuid_to_evict));
+  }
+  {
+    RaftConfigPB config;
+    AddPeer(&config, "A", V, '-');
+    AddPeer(&config, "B", V, '+');
+    AddPeer(&config, "C", V, '+');
+    AddPeer(&config, "D", N, '+', {{"PROMOTE", true}});
+    EXPECT_FALSE(CanEvictReplica(config, 3));
+    string uuid_to_evict;
+    ASSERT_TRUE(CanEvictReplica(config, 2, &uuid_to_evict));
+    EXPECT_EQ("D", uuid_to_evict);
+  }
+}
+
 } // namespace consensus
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.cc b/src/kudu/consensus/quorum_util.cc
index 90d9671..e002070 100644
--- a/src/kudu/consensus/quorum_util.cc
+++ b/src/kudu/consensus/quorum_util.cc
@@ -17,6 +17,7 @@
 #include "kudu/consensus/quorum_util.h"
 
 #include <map>
+#include <ostream>
 #include <set>
 #include <string>
 #include <utility>
@@ -35,6 +36,7 @@ using google::protobuf::RepeatedPtrField;
 using kudu::pb_util::SecureShortDebugString;
 using std::map;
 using std::pair;
+using std::set;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -382,5 +384,190 @@ string DiffConsensusStates(const ConsensusStatePB& old_state,
   return JoinStrings(change_strs, ", ");
 }
 
+// The decision is based on:
+//
+//   * the number of voter replicas in definitively bad shape and replicas
+//     marked with the REPLACE attribute
+//
+//   * the number of non-voter replicas marked with the PROMOTE=true attribute
+//     in good or possibly good state.
+//
+// This is because a replica with UNKNOWN reported health state might actually
+// be in good shape. If so, then adding a new replica would lead to
+// over-provisioning. This logic assumes that a non-voter replica does not
+// stay in unknown state for eternity -- the leader replica should take care of
+// that and eventually update the health status either to 'HEALTHY' or 'FAILED'.
+//
+// TODO(aserbin): add a test scenario for the leader replica's logic to cover
+//                the latter case.
+bool IsUnderReplicated(const RaftConfigPB& config, int replication_factor) {
+  int num_voters_total = 0;
+  int num_voters_need_replacement = 0;
+  int num_non_voters_to_promote = 0;
+
+  // While working with the optional fields related to per-replica health status
+  // and attributes, has_a_field()-like methods are not called because of
+  // the appropriate default values of those fields.
+  for (const RaftPeerPB& peer : config.peers()) {
+    switch (peer.member_type()) {
+      case RaftPeerPB::VOTER:
+        ++num_voters_total;
+        if (peer.attrs().replace() ||
+            peer.health_report().overall_health() == HealthReportPB::FAILED) {
+          ++num_voters_need_replacement;
+        }
+        break;
+      case RaftPeerPB::NON_VOTER:
+        if (peer.attrs().promote() &&
+            peer.health_report().overall_health() != HealthReportPB::FAILED) {
+          ++num_non_voters_to_promote;
+        }
+        break;
+      default:
+        LOG(DFATAL) << peer.member_type() << ": unsupported member type";
+        break;
+    }
+  }
+  return replication_factor > (num_voters_total - num_voters_need_replacement) +
+      num_non_voters_to_promote;
+}
+
+// Whether there is an excess replica to evict.
+bool CanEvictReplica(const RaftConfigPB& config,
+                     int replication_factor,
+                     string* uuid_to_evict) {
+  int num_non_voters_total = 0;
+
+  int num_voters_healthy = 0;
+  int num_voters_total = 0;
+
+  string non_voter_any;
+  string non_voter_failed;
+  string non_voter_unknown_health;
+
+  string voter_any;
+  string voter_failed;
+  string voter_replace;
+  string voter_unknown_health;
+
+  // While working with the optional fields related to per-replica health status
+  // and attributes, has_a_field()-like methods are not called because of
+  // the appropriate default values of those fields.
+  for (const RaftPeerPB& peer : config.peers()) {
+    DCHECK(peer.has_permanent_uuid() && !peer.permanent_uuid().empty());
+    const string& peer_uuid = peer.permanent_uuid();
+    const bool failed = peer.health_report().overall_health() == HealthReportPB::FAILED;
+    const bool healthy = peer.health_report().overall_health() == HealthReportPB::HEALTHY;
+    const bool unknown = !peer.has_health_report() ||
+        !peer.health_report().has_overall_health() ||
+        peer.health_report().overall_health() == HealthReportPB::UNKNOWN;
+    const bool has_replace = peer.attrs().replace();
+
+    switch (peer.member_type()) {
+      case RaftPeerPB::VOTER:
+        ++num_voters_total;
+        if (healthy && !has_replace) {
+          ++num_voters_healthy;
+        }
+        if (failed && has_replace) {
+          voter_failed = voter_replace = peer_uuid;
+        }
+        if (failed && voter_failed.empty()) {
+          voter_failed = peer_uuid;
+        }
+        if (has_replace && voter_replace.empty()) {
+          voter_replace = peer_uuid;
+        }
+        if (unknown && voter_unknown_health.empty()) {
+          voter_unknown_health = peer_uuid;
+        }
+        voter_any = peer_uuid;
+        break;
+
+      case RaftPeerPB::NON_VOTER:
+        ++num_non_voters_total;
+        if (failed && non_voter_failed.empty()) {
+          non_voter_failed = peer_uuid;
+        }
+        if (unknown && non_voter_unknown_health.empty()) {
+          non_voter_unknown_health = peer_uuid;
+        }
+        non_voter_any = peer_uuid;
+        break;
+
+      default:
+        LOG(DFATAL) << peer.member_type() << ": unsupported member type";
+        break;
+    }
+  }
+
+  // An conservative approach is used when evicting replicas.
+  //
+  // A non-voter replica may be evicted only if the number of voter replicas
+  // in good health without the REPLACE attribute is greater or equal to the
+  // specified replication factor. The idea is to avoid evicting non-voter
+  // replicas if there is uncertainty about the actual health status of the
+  // voters replicas of the tablet. This is because a present non-voter replica
+  // could to be a good fit to replace a voter replica which actual health
+  // status is not yet known.
+  //
+  // A voter replica may be evicted only if the number of voter replicas in good
+  // health without the REPLACE attribute is greater or equal to the specified
+  // replication factor. Removal of voter replicas from the tablet without exact
+  // knowledge of their health status could lead to removing the healthy ones
+  // and keeping the failed ones.
+  const bool can_evict_non_voter =
+      num_voters_healthy >= replication_factor && num_non_voters_total > 0;
+  const bool can_evict_voter =
+      num_voters_healthy > replication_factor ||
+      (num_voters_healthy == replication_factor &&
+       num_voters_total > replication_factor);
+
+  const bool can_evict = can_evict_non_voter || can_evict_voter;
+  string to_evict;
+  if (can_evict_non_voter) {
+    // First try to evict an excess non-voter, if present. This is to avoid
+    // possible IO load due to tablet copy in progress.
+    //
+    // Non-voter candidates for eviction (in decreasing priority):
+    //   * failed
+    //   * in unknown health state
+    //   * any other
+    if (!non_voter_failed.empty()) {
+      to_evict = non_voter_failed;
+    } else if (!non_voter_unknown_health.empty()) {
+      to_evict = non_voter_unknown_health;
+    } else {
+      to_evict = non_voter_any;
+    }
+  } else if (can_evict_voter) {
+    // Next try to evict an excess voter.
+    //
+    // Voter candidates for eviction (in decreasing priority):
+    //   * failed and having the attribute REPLACE set
+    //   * having the attribute REPLACE set
+    //   * failed
+    //   * in unknown health state
+    //   * any other
+    if (!voter_replace.empty()) {
+      to_evict = voter_replace;
+    } else if (!voter_failed.empty()) {
+      to_evict = voter_failed;
+    } else if (!voter_unknown_health.empty()) {
+      to_evict = voter_unknown_health;
+    } else {
+      to_evict = voter_any;
+    }
+  }
+
+  if (can_evict) {
+    DCHECK(!to_evict.empty());
+    if (uuid_to_evict) {
+      *uuid_to_evict = to_evict;
+    }
+  }
+  return can_evict;
+}
+
 }  // namespace consensus
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/consensus/quorum_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index 3bd0afc..b0d12ae 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -88,6 +88,18 @@ std::string DiffConsensusStates(const ConsensusStatePB& old_state,
 std::string DiffRaftConfigs(const RaftConfigPB& old_config,
                             const RaftConfigPB& new_config);
 
+// Return true iff the current cluster is under-replicated given the Raft
+// configuration and the included health status of the members.
+bool IsUnderReplicated(const RaftConfigPB& config, int replication_factor);
+
+// Check if the given Raft configuration contains at least one extra replica
+// which can be removed in accordance with the specified replication
+// factor. If so, then return 'true' and set the UUID of the best suited
+// replica into the 'uuid_to_evict' out parameter. Otherwise, return 'false'.
+bool CanEvictReplica(const RaftConfigPB& config,
+                         int replication_factor,
+                         std::string* uuid_to_evict = nullptr);
+
 }  // namespace consensus
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index a190ebe..45dcc2b 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -57,6 +57,7 @@ METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
 METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
 
 using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalMaster;
 using kudu::cluster::ExternalTabletServer;
 using kudu::consensus::RaftPeerPB;
 using kudu::itest::AddServer;
@@ -305,7 +306,7 @@ TEST_F(RaftConsensusNonVoterITest, AddNonVoterReplica) {
   // The master should report about the newly added NON_VOTER tablet replica
   // to the established leader.
   bool has_leader;
-  master::TabletLocationsPB tablet_locations;
+  TabletLocationsPB tablet_locations;
   ASSERT_OK(WaitForReplicasReportedToMaster(
       cluster_->master_proxy(), kOriginalReplicasNum + 1, tablet_id, kTimeout,
       WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -429,7 +430,7 @@ TEST_F(RaftConsensusNonVoterITest, AddThenRemoveNonVoterReplica) {
   // should report appropriate replica count at this point. The tablet leader
   // should be established.
   bool has_leader;
-  master::TabletLocationsPB tablet_locations;
+  TabletLocationsPB tablet_locations;
   ASSERT_OK(WaitForReplicasReportedToMaster(
       cluster_->master_proxy(), kOriginalReplicasNum, tablet_id, kTimeout,
       WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -745,7 +746,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
     // The removed tablet replica should be gone, and the master should report
     // appropriate replica count at this point.
     bool has_leader;
-    master::TabletLocationsPB tablet_locations;
+    TabletLocationsPB tablet_locations;
     ASSERT_OK(WaitForReplicasReportedToMaster(
         cluster_->master_proxy(), kInitialReplicasNum, tablet_id, kTimeout,
         WAIT_FOR_LEADER, &has_leader, &tablet_locations));
@@ -893,5 +894,151 @@ TEST_F(RaftConsensusNonVoterITest, PromotedReplicaCanVote) {
   ASSERT_EQ(new_replica_uuid, leader->uuid());
 }
 
+// Add an extra non-voter replica to the tablet and make sure it's evicted
+// by the catalog manager once catalog manager sees its state updated.
+TEST_F(RaftConsensusNonVoterITest, CatalogManagerEvictsExcessNonVoter) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const int kReplicaUnavailableSec = 5;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const int kReplicasNum = 3;
+  FLAGS_num_replicas = kReplicasNum;
+  // Need one extra tserver for a new non-voter replica.
+  FLAGS_num_tablet_servers = kReplicasNum + 1;
+  const vector<string> kMasterFlags = {
+    // The scenario runs with the new replica management scheme.
+    "--raft_prepare_replacement_before_eviction=true",
+    // Don't evict excess replicas to avoid races in the scenario.
+    "--catalog_manager_evict_excess_replicas=false",
+  };
+  const vector<string> kTserverFlags = {
+    // The scenario runs with the new replica management scheme.
+    "--raft_prepare_replacement_before_eviction=true",
+    Substitute("--consensus_rpc_timeout_ms=$0", 1000 * kReplicaUnavailableSec),
+    Substitute("--follower_unavailable_considered_failed_sec=$0",
+              kReplicaUnavailableSec),
+  };
+
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+  ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size());
+  ASSERT_EQ(kReplicasNum, tablet_replicas_.size());
+
+  TabletServerMap replica_servers;
+  for (const auto& e : tablet_replicas_) {
+    if (e.first == tablet_id_) {
+      replica_servers.emplace(e.second->uuid(), e.second);
+    }
+  }
+  ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+  TServerDetails* new_replica = nullptr;
+  for (const auto& ts : tablet_servers_) {
+    if (replica_servers.find(ts.first) == replica_servers.end()) {
+      new_replica = ts.second;
+      break;
+    }
+  }
+  ASSERT_NE(nullptr, new_replica);
+  ASSERT_OK(AddReplica(tablet_id_, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+  bool has_leader = false;
+  TabletLocationsPB tablet_locations;
+  // Make sure the extra replica is seen by the master.
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            kReplicasNum + 1,
+                                            tablet_id_,
+                                            kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader,
+                                            &tablet_locations));
+
+  // Switch the catalog manager to start evicting excess replicas
+  // (that's how it runs by default in the new replica management scheme).
+  // Prior to this point, that might lead to a race, since the newly added
+  // non-voter replica might be evicted before it's spotted by the
+  // WaitForReplicasReportedToMaster() call above.
+  for (auto i = 0; i < cluster_->num_masters(); ++i) {
+    ExternalMaster* m = cluster_->master(i);
+    ASSERT_OK(cluster_->SetFlag(m,
+        "catalog_manager_evict_excess_replicas", "true"));
+  }
+
+  ExternalTabletServer* new_replica_ts =
+      cluster_->tablet_server_by_uuid(new_replica->uuid());
+  ASSERT_NE(nullptr, new_replica_ts);
+  ASSERT_OK(new_replica_ts->Pause());
+  SCOPED_CLEANUP({
+    ASSERT_OK(new_replica_ts->Resume());
+  });
+  SleepFor(MonoDelta::FromSeconds(2 * kReplicaUnavailableSec));
+  ASSERT_OK(new_replica_ts->Resume());
+
+  // Make sure the excess non-voter replica is gone.
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            kReplicasNum,
+                                            tablet_id_,
+                                            kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader,
+                                            &tablet_locations));
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
+// Check that the catalog manager adds a non-voter replica to replace failed
+// voter replica in a tablet.
+//
+// TODO(aserbin): and make it run for 5 tablet servers.
+TEST_F(RaftConsensusNonVoterITest, CatalogManagerAddsNonVoter) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const int kReplicaUnavailableSec = 10;
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(6 * kReplicaUnavailableSec);
+  const int kReplicasNum = 3;
+  FLAGS_num_replicas = kReplicasNum;
+  // Need one extra tserver after the tserver with on of the replicas stopped.
+  // Otherwise, the catalog manager would not be able to spawn a new non-voter
+  // replacement replica.
+  FLAGS_num_tablet_servers = kReplicasNum + 1;
+  const vector<string> kMasterFlags = {
+    // The scenario runs with the new replica management scheme.
+    "--raft_prepare_replacement_before_eviction=true",
+    // Don't evict excess replicas to avoid races in the scenario.
+    "--catalog_manager_evict_excess_replicas=false",
+  };
+  const vector<string> kTserverFlags = {
+    // The scenario runs with the new replica management scheme.
+    "--raft_prepare_replacement_before_eviction=true",
+    Substitute("--follower_unavailable_considered_failed_sec=$0",
+               kReplicaUnavailableSec),
+  };
+
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+  ASSERT_EQ(kReplicasNum + 1, tablet_servers_.size());
+  ASSERT_EQ(kReplicasNum, tablet_replicas_.size());
+
+  ExternalTabletServer* ts0 = cluster_->tablet_server(0);
+  ASSERT_NE(nullptr, ts0);
+  ts0->Shutdown();
+
+  // Wait for a new non-voter replica added by the catalog manager to
+  // replace the failed one.
+  bool has_leader = false;
+  TabletLocationsPB tablet_locations;
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            kReplicasNum + 1,
+                                            tablet_id_,
+                                            kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader,
+                                            &tablet_locations));
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
 }  // namespace tserver
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 6702750..5131bd3 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -60,6 +60,7 @@
 #include <boost/function.hpp>
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/cfile/type_encodings.h"
@@ -225,6 +226,41 @@ DEFINE_int32(catalog_manager_inject_latency_prior_tsk_write_ms, 0,
 TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, hidden);
 TAG_FLAG(catalog_manager_inject_latency_prior_tsk_write_ms, unsafe);
 
+DEFINE_bool(catalog_manager_evict_excess_replicas, true,
+            "Whether catalog manager evicts excess replicas from tablet "
+            "configuration based on replication factor.");
+TAG_FLAG(catalog_manager_evict_excess_replicas, hidden);
+TAG_FLAG(catalog_manager_evict_excess_replicas, runtime);
+
+DECLARE_bool(raft_prepare_replacement_before_eviction);
+
+using base::subtle::NoBarrier_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
+using kudu::cfile::TypeEncodingInfo;
+using kudu::consensus::ConsensusServiceProxy;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::GetConsensusRole;
+using kudu::consensus::IsRaftConfigMember;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::consensus::StartTabletCopyRequestPB;
+using kudu::consensus::kMinimumTerm;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcContext;
+using kudu::security::Cert;
+using kudu::security::DataFormat;
+using kudu::security::PrivateKey;
+using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
+using kudu::security::TokenSigningPrivateKeyPB;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletStatePB;
+using kudu::tserver::TabletServerErrorPB;
 using std::pair;
 using std::set;
 using std::shared_ptr;
@@ -233,38 +269,11 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
 using std::vector;
+using strings::Substitute;
 
 namespace kudu {
 namespace master {
 
-using base::subtle::NoBarrier_CompareAndSwap;
-using base::subtle::NoBarrier_Load;
-using cfile::TypeEncodingInfo;
-using consensus::ConsensusServiceProxy;
-using consensus::ConsensusStatePB;
-using consensus::GetConsensusRole;
-using consensus::IsRaftConfigMember;
-using consensus::RaftConsensus;
-using consensus::RaftPeerPB;
-using consensus::StartTabletCopyRequestPB;
-using consensus::kMinimumTerm;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
-using rpc::RpcContext;
-using security::Cert;
-using security::DataFormat;
-using security::PrivateKey;
-using security::TokenSigner;
-using security::TokenSigningPrivateKey;
-using security::TokenSigningPrivateKeyPB;
-using strings::Substitute;
-using tablet::TABLET_DATA_DELETED;
-using tablet::TABLET_DATA_TOMBSTONED;
-using tablet::TabletDataState;
-using tablet::TabletReplica;
-using tablet::TabletStatePB;
-using tserver::TabletServerErrorPB;
-
 ////////////////////////////////////////////////////////////
 // Table Loader
 ////////////////////////////////////////////////////////////
@@ -2983,67 +2992,139 @@ class AsyncAlterTable : public RetryingTSRpcTask {
   tserver::AlterSchemaResponsePB resp_;
 };
 
-class AsyncAddServerTask : public RetryingTSRpcTask {
+class AsyncChangeConfigTask : public RetryingTSRpcTask {
  public:
-  AsyncAddServerTask(Master *master,
-                     const scoped_refptr<TabletInfo>& tablet,
-                     ConsensusStatePB cstate,
-                     ThreadSafeRandom* rng)
-    : RetryingTSRpcTask(master,
-                        gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
-                        tablet->table()),
-      tablet_(tablet),
-      cstate_(std::move(cstate)),
-      rng_(rng) {
-    deadline_ = MonoTime::Max(); // Never time out.
-  }
-
-  string type_name() const override { return "AddServer ChangeConfig"; }
+  AsyncChangeConfigTask(Master* master,
+                        scoped_refptr<TabletInfo> tablet,
+                        ConsensusStatePB cstate,
+                        consensus::ChangeConfigType change_config_type);
 
   string description() const override;
 
  protected:
-  bool SendRequest(int attempt) override;
   void HandleResponse(int attempt) override;
-
- private:
-  string tablet_id() const override { return tablet_->id(); }
+  bool CheckOpIdIndex();
 
   const scoped_refptr<TabletInfo> tablet_;
   const ConsensusStatePB cstate_;
+  const consensus::ChangeConfigType change_config_type_;
 
-  // Used to make random choices in replica selection.
-  ThreadSafeRandom* rng_;
-
-  consensus::ChangeConfigRequestPB req_;
   consensus::ChangeConfigResponsePB resp_;
+
+ private:
+  string tablet_id() const override { return tablet_->id(); }
 };
 
-string AsyncAddServerTask::description() const {
-  return Substitute("AddServer ChangeConfig RPC for tablet $0 "
-                    "with cas_config_opid_index $1",
+AsyncChangeConfigTask::AsyncChangeConfigTask(Master* master,
+                                             scoped_refptr<TabletInfo> tablet,
+                                             ConsensusStatePB cstate,
+                                             consensus::ChangeConfigType change_config_type)
+    : RetryingTSRpcTask(master,
+                        gscoped_ptr<TSPicker>(new PickLeaderReplica(tablet)),
+                        tablet->table()),
+      tablet_(std::move(tablet)),
+      cstate_(std::move(cstate)),
+      change_config_type_(change_config_type) {
+    deadline_ = MonoTime::Max(); // Never time out.
+  }
+
+string AsyncChangeConfigTask::description() const {
+  return Substitute("$0 RPC for tablet $1 with cas_config_opid_index $2",
+                    type_name(),
                     tablet_->id(),
                     cstate_.committed_config().opid_index());
 }
 
-bool AsyncAddServerTask::SendRequest(int attempt) {
-  LOG(INFO) << Substitute("Sending request for AddServer on tablet $0 (attempt $1)",
-                          tablet_->id(), attempt);
+void AsyncChangeConfigTask::HandleResponse(int attempt) {
+  if (!resp_.has_error()) {
+    MarkComplete();
+    LOG_WITH_PREFIX(INFO) << Substitute("$0 succeeded (attempt $1)",
+                                        type_name(), attempt);
+    return;
+  }
 
-  // Bail if we're retrying in vain.
+  Status status = StatusFromPB(resp_.error().status());
+
+  // Do not retry on a CAS error, otherwise retry forever or until cancelled.
+  switch (resp_.error().code()) {
+    case TabletServerErrorPB::CAS_FAILED:
+      LOG_WITH_PREFIX(WARNING) << Substitute("$0 failed with leader $1 "
+          "due to CAS failure; no further retry: $2",
+          type_name(), target_ts_desc_->ToString(),
+          status.ToString());
+      MarkFailed();
+      break;
+    default:
+      LOG_WITH_PREFIX(INFO) << Substitute("$0 failed with leader $1 "
+          "due to error $2; will retry: $3",
+          type_name(), target_ts_desc_->ToString(),
+          TabletServerErrorPB::Code_Name(resp_.error().code()), status.ToString());
+      break;
+  }
+}
+
+bool AsyncChangeConfigTask::CheckOpIdIndex() {
   int64_t latest_index;
   {
     TabletMetadataLock tablet_lock(tablet_.get(), LockMode::READ);
     latest_index = tablet_lock.data().pb.consensus_state()
-      .committed_config().opid_index();
+        .committed_config().opid_index();
   }
   if (latest_index > cstate_.committed_config().opid_index()) {
-    LOG_WITH_PREFIX(INFO) << Substitute("Latest config for has opid_index of $0 "
-        "while this task has opid_index of $1. Aborting task",
+    LOG_WITH_PREFIX(INFO) << Substitute("aborting the task: "
+        "latest config opid_index $0; task opid_index $1",
         latest_index, cstate_.committed_config().opid_index());
     MarkAborted();
     return false;
   }
+  return true;
+}
+
+class AsyncAddReplicaTask : public AsyncChangeConfigTask {
+ public:
+  AsyncAddReplicaTask(Master* master,
+                      scoped_refptr<TabletInfo> tablet,
+                      ConsensusStatePB cstate,
+                      RaftPeerPB::MemberType member_type,
+                      ThreadSafeRandom* rng);
+
+  string type_name() const override;
+
+ protected:
+  bool SendRequest(int attempt) override;
+
+ private:
+  const RaftPeerPB::MemberType member_type_;
+
+  // Used to make random choices in replica selection.
+  ThreadSafeRandom* rng_;
+};
+
+AsyncAddReplicaTask::AsyncAddReplicaTask(Master* master,
+                                         scoped_refptr<TabletInfo> tablet,
+                                         ConsensusStatePB cstate,
+                                         RaftPeerPB::MemberType member_type,
+                                         ThreadSafeRandom* rng)
+    : AsyncChangeConfigTask(master, std::move(tablet), std::move(cstate),
+                            consensus::ADD_SERVER),
+      member_type_(member_type),
+      rng_(rng) {
+}
+
+string AsyncAddReplicaTask::type_name() const {
+  return Substitute("ChangeConfig:$0:$1",
+                    consensus::ChangeConfigType_Name(change_config_type_),
+                    RaftPeerPB::MemberType_Name(member_type_));
+}
+
+bool AsyncAddReplicaTask::SendRequest(int attempt) {
+  // Bail if we're retrying in vain.
+  if (!CheckOpIdIndex()) {
+    return false;
+  }
+
+  LOG(INFO) << Substitute("Sending $0 on tablet $1 (attempt $2)",
+                          type_name(), tablet_->id(), attempt);
 
   // Select the replica we wish to add to the config.
   // Do not include current members of the config.
@@ -3063,48 +3144,76 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
     return false;
   }
 
-  req_.set_dest_uuid(target_ts_desc_->permanent_uuid());
-  req_.set_tablet_id(tablet_->id());
-  req_.set_type(consensus::ADD_SERVER);
-  req_.set_cas_config_opid_index(cstate_.committed_config().opid_index());
-  RaftPeerPB* peer = req_.mutable_server();
+  consensus::ChangeConfigRequestPB req;
+  req.set_dest_uuid(target_ts_desc_->permanent_uuid());
+  req.set_tablet_id(tablet_->id());
+  req.set_type(consensus::ADD_SERVER);
+  req.set_cas_config_opid_index(cstate_.committed_config().opid_index());
+  RaftPeerPB* peer = req.mutable_server();
   peer->set_permanent_uuid(replacement_replica->permanent_uuid());
   ServerRegistrationPB peer_reg;
   replacement_replica->GetRegistration(&peer_reg);
   CHECK_GT(peer_reg.rpc_addresses_size(), 0);
   *peer->mutable_last_known_addr() = peer_reg.rpc_addresses(0);
-  peer->set_member_type(RaftPeerPB::VOTER);
-  VLOG(1) << Substitute("Sending AddServer ChangeConfig request to $0: $1",
-                        target_ts_desc_->ToString(), SecureDebugString(req_));
-  consensus_proxy_->ChangeConfigAsync(req_, &resp_, &rpc_,
-                                      boost::bind(&AsyncAddServerTask::RpcCallback, this));
+  peer->set_member_type(member_type_);
+  VLOG(1) << Substitute("Sending $0 request to $1: $2",
+                        type_name(), target_ts_desc_->ToString(), SecureDebugString(req));
+  consensus_proxy_->ChangeConfigAsync(req, &resp_, &rpc_,
+                                      boost::bind(&AsyncAddReplicaTask::RpcCallback, this));
   return true;
 }
 
-void AsyncAddServerTask::HandleResponse(int attempt) {
-  if (!resp_.has_error()) {
-    MarkComplete();
-    LOG_WITH_PREFIX(INFO) << "Config change to add server succeeded";
-    return;
-  }
+class AsyncEvictReplicaTask : public AsyncChangeConfigTask {
+ public:
+  AsyncEvictReplicaTask(Master *master,
+                        scoped_refptr<TabletInfo> tablet,
+                        ConsensusStatePB cstate,
+                        string peer_uuid_to_evict);
 
-  Status status = StatusFromPB(resp_.error().status());
+  string type_name() const override;
 
-  // Do not retry on a CAS error, otherwise retry forever or until cancelled.
-  switch (resp_.error().code()) {
-    case TabletServerErrorPB::CAS_FAILED:
-      LOG_WITH_PREFIX(WARNING) << Substitute("ChangeConfig() failed with leader $0 "
-          "due to CAS failure. No further retry: $1",
-          target_ts_desc_->ToString(), status.ToString());
-      MarkFailed();
-      break;
-    default:
-      LOG_WITH_PREFIX(INFO) << Substitute("ChangeConfig() failed with leader $0 "
-          "due to error $1. This operation will be retried. Error detail: $2",
-          target_ts_desc_->ToString(),
-          TabletServerErrorPB::Code_Name(resp_.error().code()), status.ToString());
-      break;
+ protected:
+  bool SendRequest(int attempt) override;
+
+ private:
+  const string peer_uuid_to_evict_;
+};
+
+AsyncEvictReplicaTask::AsyncEvictReplicaTask(Master* master,
+                                             scoped_refptr<TabletInfo> tablet,
+                                             ConsensusStatePB cstate,
+                                             string peer_uuid_to_evict)
+    : AsyncChangeConfigTask(master, std::move(tablet), std::move(cstate),
+                            consensus::REMOVE_SERVER),
+      peer_uuid_to_evict_(std::move(peer_uuid_to_evict)) {
+}
+
+string AsyncEvictReplicaTask::type_name() const {
+  return Substitute("ChangeConfig:$0",
+                    consensus::ChangeConfigType_Name(change_config_type_));
+}
+
+bool AsyncEvictReplicaTask::SendRequest(int attempt) {
+  // Bail if we're retrying in vain.
+  if (!CheckOpIdIndex()) {
+    return false;
   }
+
+  LOG(INFO) << Substitute("Sending $0 on tablet $1 (attempt $2)",
+                          type_name(), tablet_->id(), attempt);
+
+  consensus::ChangeConfigRequestPB req;
+  req.set_dest_uuid(target_ts_desc_->permanent_uuid());
+  req.set_tablet_id(tablet_->id());
+  req.set_type(consensus::REMOVE_SERVER);
+  req.set_cas_config_opid_index(cstate_.committed_config().opid_index());
+  RaftPeerPB* peer = req.mutable_server();
+  peer->set_permanent_uuid(peer_uuid_to_evict_);
+  VLOG(1) << Substitute("Sending $0 request to $1: $2",
+                        type_name(), target_ts_desc_->ToString(), SecureDebugString(req));
+  consensus_proxy_->ChangeConfigAsync(req, &resp_, &rpc_,
+                                      boost::bind(&AsyncEvictReplicaTask::RpcCallback, this));
+  return true;
 }
 
 Status CatalogManager::ProcessTabletReport(
@@ -3265,6 +3374,8 @@ Status CatalogManager::ProcessTabletReport(
       continue;
     }
 
+    const auto replication_factor = table->metadata().state().pb.num_replicas();
+    bool consensus_state_updated = false;
     // 7. Process the report's consensus state. There may be one even when the
     // replica has been tombstoned.
     if (report.has_consensus_state()) {
@@ -3306,11 +3417,12 @@ Status CatalogManager::ProcessTabletReport(
       //   the committed config's opid_index).
       // - The new cstate has a leader, and either the old cstate didn't, or
       //   there was a term change.
-      if (cstate.committed_config().opid_index() > prev_cstate.committed_config().opid_index() ||
+      consensus_state_updated = (cstate.committed_config().opid_index() >
+                                 prev_cstate.committed_config().opid_index()) ||
           (!cstate.leader_uuid().empty() &&
            (prev_cstate.leader_uuid().empty() ||
-            cstate.current_term() > prev_cstate.current_term()))) {
-
+            cstate.current_term() > prev_cstate.current_term()));
+      if (consensus_state_updated) {
         // 7d(i). Retain knowledge of the leader even if it wasn't reported in
         // the latest config.
         //
@@ -3326,13 +3438,12 @@ Status CatalogManager::ProcessTabletReport(
           } else if (!cstate.leader_uuid().empty() &&
               !prev_cstate.leader_uuid().empty() &&
               cstate.leader_uuid() != prev_cstate.leader_uuid()) {
-            string msg = Substitute("Previously reported cstate for tablet $0 gave "
+            LOG(DFATAL) << Substitute("Previously reported cstate for tablet $0 gave "
                 "a different leader for term $1 than the current cstate. "
                 "Previous cstate: $2. Current cstate: $3.",
                 tablet->ToString(), cstate.current_term(),
                 SecureShortDebugString(prev_cstate),
                 SecureShortDebugString(cstate));
-            LOG(DFATAL) << msg;
             continue;
           }
         }
@@ -3368,15 +3479,37 @@ Status CatalogManager::ProcessTabletReport(
             }
           }
         }
+      }
 
-        // 7d(iv). Add a server to the config if it is under-replicated.
+      // 7e. Make tablet configuration change depending on the mode the server
+      // is running with. The choice between two alternative modes is controlled
+      // by the 'raft_prepare_replacement_before_eviction' run-time flag.
+      if (FLAGS_raft_prepare_replacement_before_eviction) {
+        // An alternative scheme of managing tablet replicas: the catalog
+        // manager processes the health-related info on replicas from the tablet
+        // report and initiates appropriate modifications for the tablet Raft
+        // configuration: evict an already-replaced failed voter replica or add
+        // a new non-voter replica marked for promotion as a replacement.
+        const RaftConfigPB& config = cstate.committed_config();
+        string to_evict;
+        if (IsUnderReplicated(config, replication_factor)) {
+          rpcs.emplace_back(new AsyncAddReplicaTask(
+              master_, tablet, cstate, RaftPeerPB::NON_VOTER, &rng_));
+        } else if (PREDICT_TRUE(FLAGS_catalog_manager_evict_excess_replicas) &&
+                   CanEvictReplica(config, replication_factor, &to_evict)) {
+          DCHECK(!to_evict.empty());
+          rpcs.emplace_back(new AsyncEvictReplicaTask(
+              master_, tablet, cstate, std::move(to_evict)));
+        }
+      } else if (consensus_state_updated &&
+                 FLAGS_master_add_server_when_underreplicated &&
+                 CountVoters(cstate.committed_config()) < replication_factor) {
+        // Add a server to the config if it is under-replicated.
         //
         // This is an idempotent operation due to a CAS enforced on the
         // committed config's opid_index.
-        if (FLAGS_master_add_server_when_underreplicated &&
-            CountVoters(cstate.committed_config()) < table->metadata().state().pb.num_replicas()) {
-          rpcs.emplace_back(new AsyncAddServerTask(master_, tablet, cstate, &rng_));
-        }
+        rpcs.emplace_back(new AsyncAddReplicaTask(
+            master_, tablet, cstate, RaftPeerPB::VOTER, &rng_));
       }
     }
 
@@ -3820,7 +3953,7 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc
   ConsensusStatePB* cstate = tablet->mutable_metadata()->mutable_dirty()
           ->pb.mutable_consensus_state();
   cstate->set_current_term(kMinimumTerm);
-  consensus::RaftConfigPB *config = cstate->mutable_committed_config();
+  RaftConfigPB *config = cstate->mutable_committed_config();
 
   // Maintain ability to downgrade Kudu to a version with LocalConsensus.
   if (nreplicas == 1) {
@@ -3836,7 +3969,7 @@ Status CatalogManager::SelectReplicasForTablet(const TSDescriptorVector& ts_desc
 
 void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& tablet,
                                              const TabletMetadataLock& tablet_lock) {
-  const consensus::RaftConfigPB& config =
+  const RaftConfigPB& config =
       tablet_lock.data().pb.consensus_state().committed_config();
   tablet->set_last_create_tablet_time(MonoTime::Now());
   for (const RaftPeerPB& peer : config.peers()) {
@@ -3850,7 +3983,7 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
 
 void CatalogManager::SelectReplicas(const TSDescriptorVector& ts_descs,
                                     int nreplicas,
-                                    consensus::RaftConfigPB *config) {
+                                    RaftConfigPB *config) {
   DCHECK_EQ(0, config->peers_size()) << "RaftConfig not empty: " << SecureShortDebugString(*config);
   DCHECK_LE(nreplicas, ts_descs.size());
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/87dcaf34/src/kudu/master/catalog_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index b25a655..5f406ad 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -797,7 +797,7 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
                                        uint32_t version);
 
   // Send the "create tablet request" to all peers of a particular tablet.
-  //.
+  //
   // The creation is async, and at the moment there is no error checking on the
   // caller side. We rely on the assignment timeout. If we don't see the tablet
   // after the timeout, we regenerate a new one and proceed with a new