You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/03/24 22:08:41 UTC

[2/2] kudu git commit: KUDU-1330: Add a tool to unsafely recover from loss of majority replicas

KUDU-1330: Add a tool to unsafely recover from loss of majority replicas

This patch adds an API to allow unsafe config change via an external
recovery tool 'kudu remote_replica unsafe_change_config'.

This tool lets us replace a N-replica config on a tablet server with a
new config containing N or less replicas. This is particularly useful
when we have majority of the replicas down and for some reason we are not
able to bring the tablet back online using other recovery tools like
'kudu remote_replica copy'. We can use this tool to force a new config on the
surviving replica providing all the replica uuids of the new config from
the cli tool. As a result of the forced config change, the automatic leader
election kicks in via raft mechanisms and the re-replication is triggered
from master (if needed due to under-replicated tablet) to bring the replica
count of the tablet back upto N.

How does the tool bring tablet back online with new config:
a) The tool acts as a 'fake' leader and generates the consensus update with
   a bumped up term along with the new config. The surviving node (leader or
   follower) accepts the request and replicates the request and goes through
   a pre-election phase in which a leader is elected among the nodes provided
   in the config. If the new config provides enough VOTERs to win an election,
   the leader election succeeds and the new config will be committed.
   Master can eventually recognize this consensus state change and make sure
   tablet is re-replicated back to healthy count if it finds the tablet
   under-replicated.
b) Assumption is that, the dead nodes are not coming back during this recovery,
   so master will very likely choose the new healthy live servers for
   re-replication if needed. If the dead nodes come back after master is
   updated with new unsafely forced config, master will delete the replicas
   on those dead nodes via DeleteTablet RPC because they are no longer part
   of the tablet config.

Also, the UnsafeChangeConfig() API adds a flag to append another change_config
op while there is one pending config in the log. This flag lifts the safety
net around pending configs which states that there can be only one pending
config at the max for a given replica.

This patch is a first in series for unsafe config changes, and assumes that
the dead servers are not coming back while the new config change is taking
effect. The future revs of this patch should weaken this assumption and build
more safety guarantees around situations dead nodes coming back during the
unsafe change config operations on the cluster.

Tests associated with this patch:
- Unsafe config change when there is one follower survivor in the cluster.
- Unsafe config change when there is one leader survivor in the cluster.
- Unsafe config change when the unsafe config contains 2 replicas.
- Unsafe config change on a 5-replica config with 2 replicas in the new config.
- Unsafe config change when there is a pending config on the surviving leader.
- Unsafe config change when there is a pending config on a surviving follower.
- Unsafe config change when there are back to back pending configs on WAL,
  and verify that tablet bootstraps fine.
- Test back to back unsafe config changes when there are multiple pending
  configs present with the replica and the one with 'sane' new config will
  bring the tablet back to online state.

TODO:
1) Test exercising all the error cases in the UnsafeChangeConfig API.
2) Test the UnsafeChangeConfig RPC directly without going via external tool.

Change-Id: I908d8c981df74d56dbd034e72001d379fb314700
Reviewed-on: http://gerrit.cloudera.org:8080/6066
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/80026a34
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/80026a34
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/80026a34

Branch: refs/heads/master
Commit: 80026a34f66949da77bddf209579f51cfc997e2a
Parents: cf25ec5
Author: Dinesh Bhat <di...@cloudera.com>
Authored: Wed Feb 15 01:46:18 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Mar 24 22:08:17 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus.h                  |   3 +
 src/kudu/consensus/consensus.proto              |  26 +
 src/kudu/consensus/consensus_meta.cc            |   8 +
 src/kudu/consensus/consensus_queue.cc           |   9 +-
 src/kudu/consensus/metadata.proto               |   3 +
 src/kudu/consensus/raft_consensus.cc            | 169 +++-
 src/kudu/consensus/raft_consensus.h             |   3 +
 src/kudu/consensus/raft_consensus_state.cc      |  51 +-
 src/kudu/consensus/time_manager.h               |   1 +
 .../integration-tests/cluster_itest_util.cc     |  61 ++
 src/kudu/integration-tests/cluster_itest_util.h |  22 +
 .../integration-tests/raft_consensus-itest.cc   |  90 +--
 src/kudu/tools/kudu-admin-test.cc               | 790 +++++++++++++++++++
 src/kudu/tools/kudu-tool-test.cc                |   5 +-
 src/kudu/tools/tool_action_remote_replica.cc    |  54 ++
 src/kudu/tserver/tablet_service.cc              |  28 +-
 src/kudu/tserver/tablet_service.h               |   4 +
 17 files changed, 1216 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index 5999fde..78edae3 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -251,6 +251,9 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
     return Status::NotSupported("Not implemented.");
   }
 
+  virtual Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
+                                    tserver::TabletServerErrorPB::Code* error) = 0;
+
   // Returns the current Raft role of this instance.
   virtual RaftPeerPB::Role role() const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index e36e0bb..94ba128 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -482,6 +482,29 @@ message StartTabletCopyResponsePB {
   optional tserver.TabletServerErrorPB error = 1;
 }
 
+// An unsafe change configuration request for the tablet with 'tablet_id'.
+message UnsafeChangeConfigRequestPB {
+  // UUID of server this request is addressed to.
+  optional bytes dest_uuid = 1;
+  required bytes tablet_id = 2;
+
+  // Sender identification, it could be a static string as well.
+  required bytes caller_id = 3;
+
+  // The raft config sent to destination server.
+  // Only the 'permanent_uuid' of each peer in the config is
+  // required (address-related information is ignored by the server).
+  // The peers specified in the 'new_config' are required to be a
+  // subset of (or equal to) the peers in the committed config on the
+  // destination replica.
+  required RaftConfigPB new_config = 4;
+}
+
+// The unsafe change configuration response. 'error' field is set if operation failed.
+message UnsafeChangeConfigResponsePB {
+  optional tserver.TabletServerErrorPB error = 1;
+}
+
 // A Raft implementation.
 service ConsensusService {
   option (kudu.rpc.default_authz_method) = "AuthorizeServiceUser";
@@ -498,6 +521,9 @@ service ConsensusService {
   // An OK response means the operation was successful.
   rpc ChangeConfig(ChangeConfigRequestPB) returns (ChangeConfigResponsePB);
 
+  // Implements unsafe config change operation for manual recovery use cases.
+  rpc UnsafeChangeConfig(UnsafeChangeConfigRequestPB) returns (UnsafeChangeConfigResponsePB);
+
   rpc GetNodeInstance(GetNodeInstanceRequestPB) returns (GetNodeInstanceResponsePB);
 
   // Force this node to run a leader election.

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/consensus_meta.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc
index fdb2e27..977074c 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -24,10 +24,17 @@
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/stopwatch.h"
 
+DEFINE_double(fault_crash_before_cmeta_flush, 0.0,
+              "Fraction of the time when the server will crash just before flushing "
+              "consensus metadata. (For testing only!)");
+TAG_FLAG(fault_crash_before_cmeta_flush, unsafe);
+
 namespace kudu {
 namespace consensus {
 
@@ -186,6 +193,7 @@ void ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& c
 }
 
 Status ConsensusMetadata::Flush() {
+  MAYBE_FAULT(FLAGS_fault_crash_before_cmeta_flush);
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing consensus metadata");
 
   flush_count_for_tests_++;

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index ad98875..5358326 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -556,6 +556,8 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
 
   // If we haven't enough peers to calculate the watermark return.
   if (watermarks.size() < num_peers_required) {
+    VLOG_WITH_PREFIX_UNLOCKED(3) << "Watermarks size: " << watermarks.size() << ", "
+                                 << "Num peers required: " << num_peers_required;
     return;
   }
 
@@ -779,7 +781,12 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       } else {
         VLOG_WITH_PREFIX_UNLOCKED(2) << "Cannot advance commit index, waiting for > "
                                      << "first index in current leader term: "
-                                     << queue_state_.first_index_in_current_term;
+                                     << queue_state_.first_index_in_current_term << ". "
+                                     << "current majority_replicated_index: "
+                                     << queue_state_.majority_replicated_index << ", "
+                                     << "current committed_index: "
+                                     << queue_state_.committed_index;
+
       }
 
       // Only notify observers if the commit index actually changed.

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/metadata.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/metadata.proto b/src/kudu/consensus/metadata.proto
index a326039..b73cfcc 100644
--- a/src/kudu/consensus/metadata.proto
+++ b/src/kudu/consensus/metadata.proto
@@ -89,6 +89,9 @@ message RaftConfigPB {
   // Obsolete. This parameter has been retired.
   optional bool OBSOLETE_local = 2;
 
+  // Flag to allow unsafe config change operations.
+  optional bool unsafe_config_change = 4 [ default = false ];
+
   // The set of peers in the configuration.
   repeated RaftPeerPB peers = 3;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index d4bca32..d5de1a1 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -613,11 +613,13 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
 
     const RaftConfigPB& new_config = change_record->new_config();
 
-    Status s = state_->CheckNoConfigChangePendingUnlocked();
-    if (PREDICT_FALSE(!s.ok())) {
-      s = s.CloneAndAppend(Substitute("\n  New config: $0", SecureShortDebugString(new_config)));
-      LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
-      return s;
+    if (!new_config.unsafe_config_change()) {
+      Status s = state_->CheckNoConfigChangePendingUnlocked();
+      if (PREDICT_FALSE(!s.ok())) {
+        s = s.CloneAndAppend(Substitute("\n  New config: $0", SecureShortDebugString(new_config)));
+        LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
+        return s;
+      }
     }
     // Check if the pending Raft config has an OpId less than the committed
     // config. If so, this is a replay at startup in which the COMMIT
@@ -1161,7 +1163,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
         deduped_req.preceding_opid->index(),
         request->committed_index()});
 
-    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to;
+    VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
+                                 << ", Last pending opid index: "
+                                 << pending_.GetLastPendingTransactionOpId().index()
+                                 << ", preceding opid index: "
+                                 << deduped_req.preceding_opid->index()
+                                 << ", requested index: " << request->committed_index();
     TRACE("Early marking committed up to index $0", early_apply_up_to);
     CHECK_OK(pending_.AdvanceCommittedIndex(early_apply_up_to));
 
@@ -1319,10 +1326,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     TRACE("finished");
   }
 
-  if (PREDICT_FALSE(VLOG_IS_ON(2))) {
-    VLOG_WITH_PREFIX(2) << "Replica updated."
-        << state_->ToString() << " Request: " << SecureShortDebugString(*request);
-  }
+  VLOG_WITH_PREFIX(2) << "Replica updated. " << state_->ToString()
+                      << ". Request: " << SecureShortDebugString(*request);
 
   TRACE("UpdateReplicas() finished");
   return Status::OK();
@@ -1555,6 +1560,150 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
   return Status::OK();
 }
 
+Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
+                                         TabletServerErrorPB::Code* error_code) {
+  if (PREDICT_FALSE(!req.has_new_config())) {
+    *error_code = TabletServerErrorPB::INVALID_CONFIG;
+    return Status::InvalidArgument("Request must contain 'new_config' argument "
+                                   "to UnsafeChangeConfig()", SecureShortDebugString(req));
+  }
+  if (PREDICT_FALSE(!req.has_caller_id())) {
+    *error_code = TabletServerErrorPB::INVALID_CONFIG;
+    return Status::InvalidArgument("Must specify 'caller_id' argument to UnsafeChangeConfig()",
+                                   SecureShortDebugString(req));
+  }
+
+  // Grab the committed config and current term on this node.
+  int64_t current_term;
+  RaftConfigPB committed_config;
+  int64_t all_replicated_index;
+  int64 last_committed_index;
+  OpId preceding_opid;
+  uint64 msg_timestamp;
+  string local_peer_uuid;
+  {
+    // Take the snapshot of the replica state and queue state so that
+    // we can stick them in the consensus update request later.
+    ReplicaState::UniqueLock lock;
+    RETURN_NOT_OK(state_->LockForRead(&lock));
+    local_peer_uuid = state_->GetPeerUuid();
+    current_term = state_->GetCurrentTermUnlocked();
+    committed_config = state_->GetCommittedConfigUnlocked();
+    if (state_->IsConfigChangePendingUnlocked()) {
+      LOG_WITH_PREFIX_UNLOCKED(WARNING)
+            << "Replica has a pending config, but the new config "
+            << "will be unsafely changed anyway. "
+            << "Currently pending config on the node: "
+            << SecureShortDebugString(state_->GetPendingConfigUnlocked());
+    }
+    all_replicated_index = queue_->GetAllReplicatedIndex();
+    last_committed_index = queue_->GetCommittedIndex();
+    preceding_opid = queue_->GetLastOpIdInLog();
+    msg_timestamp = time_manager_->GetSerialTimestamp().value();
+  }
+
+  // Validate that passed replica uuids are part of the committed config
+  // on this node.  This allows a manual recovery tool to only have to specify
+  // the uuid of each replica in the new config without having to know the
+  // addresses of each server (since we can get the address information from
+  // the committed config). Additionally, only a subset of the committed config
+  // is required for typical cluster repair scenarios.
+  unordered_set<string> retained_peer_uuids;
+  const RaftConfigPB& config = req.new_config();
+  for (const RaftPeerPB& new_peer : config.peers()) {
+    const string& peer_uuid = new_peer.permanent_uuid();
+    retained_peer_uuids.insert(peer_uuid);
+    if (!IsRaftConfigMember(peer_uuid, committed_config)) {
+      *error_code = TabletServerErrorPB::INVALID_CONFIG;
+      return Status::InvalidArgument(Substitute("Peer with uuid $0 is not in the committed  "
+                                                "config on this replica, rejecting the  "
+                                                "unsafe config change request for tablet $1. "
+                                                "Committed config: $2",
+                                                peer_uuid, req.tablet_id(),
+                                                SecureShortDebugString(committed_config)));
+    }
+  }
+
+  RaftConfigPB new_config = committed_config;
+  for (const auto& peer : committed_config.peers()) {
+    const string& peer_uuid = peer.permanent_uuid();
+    if (!ContainsKey(retained_peer_uuids, peer_uuid)) {
+      CHECK(RemoveFromRaftConfig(&new_config, peer_uuid));
+    }
+  }
+  // Check that local peer is part of the new config and is a VOTER.
+  // Although it is valid for a local replica to not have itself
+  // in the committed config, it is rare and a replica without itself
+  // in the latest config is definitely not caught up with the latest leader's log.
+  if (!IsRaftConfigVoter(local_peer_uuid, new_config)) {
+    return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
+                                              "a VOTER in the new config, "
+                                              "rejecting the unsafe config "
+                                              "change request for tablet $1. "
+                                              "Rejected config: $2" ,
+                                              local_peer_uuid, req.tablet_id(),
+                                              SecureShortDebugString(new_config)));
+  }
+  new_config.set_unsafe_config_change(true);
+  int64 replicate_opid_index = preceding_opid.index() + 1;
+  new_config.set_opid_index(replicate_opid_index);
+
+  // Sanity check the new config. 'type' is irrelevant here.
+  Status s = VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM);
+  if (!s.ok()) {
+    *error_code = TabletServerErrorPB::INVALID_CONFIG;
+    return Status::InvalidArgument(Substitute("The resulting new config for tablet $0  "
+                                              "from passed parameters has failed raft "
+                                              "config sanity check: $1",
+                                              req.tablet_id(), s.ToString()));
+  }
+
+  // Prepare the consensus request as if the request is being generated
+  // from a different leader.
+  ConsensusRequestPB consensus_req;
+  ConsensusResponsePB consensus_resp;
+  consensus_req.set_caller_uuid(req.caller_id());
+  // Bumping up the term for the consensus request being generated.
+  // This makes this request appear to come from a new leader that
+  // the local replica doesn't know about yet. If the local replica
+  // happens to be the leader, this will cause it to step down.
+  int64 new_term = current_term + 1;
+  consensus_req.set_caller_term(new_term);
+  consensus_req.mutable_preceding_id()->CopyFrom(preceding_opid);
+  consensus_req.set_committed_index(last_committed_index);
+  consensus_req.set_all_replicated_index(all_replicated_index);
+
+  // Prepare the replicate msg to be replicated.
+  ReplicateMsg* replicate = consensus_req.add_ops();
+  ChangeConfigRecordPB* cc_req = replicate->mutable_change_config_record();
+  cc_req->set_tablet_id(req.tablet_id());
+  *cc_req->mutable_old_config() = committed_config;
+  *cc_req->mutable_new_config() = new_config;
+  OpId* id = replicate->mutable_id();
+  // Bumping up both the term and the opid_index from what's found in the log.
+  id->set_term(new_term);
+  id->set_index(replicate_opid_index);
+  replicate->set_op_type(CHANGE_CONFIG_OP);
+  replicate->set_timestamp(msg_timestamp);
+
+  VLOG_WITH_PREFIX(3) << "UnsafeChangeConfig: Generated consensus request: "
+                      << SecureShortDebugString(consensus_req);
+
+  LOG_WITH_PREFIX(WARNING)
+        << "PROCEEDING WITH UNSAFE CONFIG CHANGE ON THIS SERVER, "
+        << "COMMITTED CONFIG: " << SecureShortDebugString(committed_config)
+        << "NEW CONFIG: " << SecureShortDebugString(new_config);
+
+  s = Update(&consensus_req, &consensus_resp);
+  if (!s.ok() || consensus_resp.has_error()) {
+    *error_code = TabletServerErrorPB::UNKNOWN_ERROR;
+  }
+  if (s.ok() && consensus_resp.has_error()) {
+    s = StatusFromPB(consensus_resp.error().status());
+  }
+  return s;
+}
+
 void RaftConsensus::Shutdown() {
   // Avoid taking locks if already shut down so we don't violate
   // ThreadRestrictions assertions in the case where the RaftConsensus

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 7777719..67c92fd 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -125,6 +125,9 @@ class RaftConsensus : public Consensus,
                       const StatusCallback& client_cb,
                       boost::optional<tserver::TabletServerErrorPB::Code>* error_code) override;
 
+  Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
+                            tserver::TabletServerErrorPB::Code* error_code) override;
+
   Status GetLastOpId(OpIdType type, OpId* id) override;
 
   RaftPeerPB::Role role() const override;

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index a8f70ae..ebd45e6 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -198,10 +198,17 @@ Status ReplicaState::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
   DCHECK(update_lock_.is_locked());
   RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, UNCOMMITTED_QUORUM),
                         "Invalid config to set as pending");
-  CHECK(!cmeta_->has_pending_config())
-      << "Attempt to set pending config while another is already pending! "
-      << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
-      << "Attempted new pending config: " << SecureShortDebugString(new_config);
+  if (!new_config.unsafe_config_change()) {
+    CHECK(!cmeta_->has_pending_config())
+        << "Attempt to set pending config while another is already pending! "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "Attempted new pending config: " << SecureShortDebugString(new_config);
+  } else if (cmeta_->has_pending_config()) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO)
+        << "Allowing unsafe config change even though there is a pending config! "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "New pending config: " << SecureShortDebugString(new_config);
+  }
   cmeta_->set_pending_config(new_config);
   return Status::OK();
 }
@@ -216,24 +223,32 @@ const RaftConfigPB& ReplicaState::GetPendingConfigUnlocked() const {
   return cmeta_->pending_config();
 }
 
-Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& committed_config) {
+Status ReplicaState::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
   TRACE_EVENT0("consensus", "ReplicaState::SetCommittedConfigUnlocked");
   DCHECK(update_lock_.is_locked());
-  DCHECK(committed_config.IsInitialized());
-  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(committed_config, COMMITTED_QUORUM),
+  DCHECK(config_to_commit.IsInitialized());
+  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit, COMMITTED_QUORUM),
                         "Invalid config to set as committed");
 
-  // Compare committed with pending configuration, ensure they are the same.
-  DCHECK(cmeta_->has_pending_config());
-  const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
-  // Quorums must be exactly equal, even w.r.t. peer ordering.
-  CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), committed_config.SerializeAsString())
-      << Substitute("New committed config must equal pending config, but does not. "
-                    "Pending config: $0, committed config: $1",
-                    SecureShortDebugString(pending_config),
-                    SecureShortDebugString(committed_config));
-
-  cmeta_->set_committed_config(committed_config);
+  // Compare committed with pending configuration, ensure that they are the same.
+  // In the event of an unsafe config change triggered by an administrator,
+  // it is possible that the config being committed may not match the pending config
+  // because unsafe config change allows multiple pending configs to exist.
+  // Therefore we only need to validate that 'config_to_commit' matches the pending config
+  // if the pending config does not have its 'unsafe_config_change' flag set.
+  if (IsConfigChangePendingUnlocked()) {
+    const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
+    if (!pending_config.unsafe_config_change()) {
+      // Quorums must be exactly equal, even w.r.t. peer ordering.
+      CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(),
+               config_to_commit.SerializeAsString())
+          << Substitute("New committed config must equal pending config, but does not. "
+                        "Pending config: $0, committed config: $1",
+                        SecureShortDebugString(pending_config),
+                        SecureShortDebugString(config_to_commit));
+    }
+  }
+  cmeta_->set_committed_config(config_to_commit);
   cmeta_->clear_pending_config();
   CHECK_OK(cmeta_->Flush());
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/consensus/time_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/time_manager.h b/src/kudu/consensus/time_manager.h
index 980e4ad..83b507a 100644
--- a/src/kudu/consensus/time_manager.h
+++ b/src/kudu/consensus/time_manager.h
@@ -129,6 +129,7 @@ class TimeManager : public RefCountedThreadSafe<TimeManager> {
  private:
   FRIEND_TEST(TimeManagerTest, TestTimeManagerNonLeaderMode);
   FRIEND_TEST(TimeManagerTest, TestTimeManagerLeaderMode);
+  friend class RaftConsensus;
 
   // Returns whether we've advanced safe time recently.
   // If this returns false we might be partitioned or there might be election churn.

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index df8e0af..e7b44b3 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -347,6 +347,51 @@ Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                      SecureShortDebugString(cstate), s.ToString()));
 }
 
+Status WaitForReplicasReportedToMaster(
+    const shared_ptr<master::MasterServiceProxy>& master_proxy,
+    int num_replicas, const string& tablet_id,
+    const MonoDelta& timeout,
+    WaitForLeader wait_for_leader,
+    bool* has_leader,
+    master::TabletLocationsPB* tablet_locations) {
+  MonoTime deadline(MonoTime::Now() + timeout);
+  while (true) {
+    RETURN_NOT_OK(GetTabletLocations(master_proxy, tablet_id, timeout, tablet_locations));
+    *has_leader = false;
+    if (tablet_locations->replicas_size() == num_replicas) {
+      for (const master::TabletLocationsPB_ReplicaPB& replica :
+                    tablet_locations->replicas()) {
+        if (replica.role() == RaftPeerPB::LEADER) {
+          *has_leader = true;
+        }
+      }
+      if (wait_for_leader == DONT_WAIT_FOR_LEADER ||
+          (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) {
+        break;
+      }
+    }
+    if (deadline < MonoTime::Now()) {
+      return Status::TimedOut(Substitute("Timed out after waiting "
+          "for tablet $1 expected to report master with $2 replicas, has_leader: $3",
+          tablet_id, num_replicas, *has_leader));
+    }
+    SleepFor(MonoDelta::FromMilliseconds(20));
+  }
+  if (num_replicas != tablet_locations->replicas_size()) {
+      return Status::NotFound(Substitute("Number of replicas for tablet $0 "
+          "reported to master $1:$2",
+          tablet_id, tablet_locations->replicas_size(),
+          SecureDebugString(*tablet_locations)));
+  }
+  if (wait_for_leader == WAIT_FOR_LEADER && !(*has_leader)) {
+    return Status::NotFound(Substitute("Leader for tablet $0 not found on master, "
+                                       "number of replicas $1:$2",
+                                       tablet_id, tablet_locations->replicas_size(),
+                                       SecureDebugString(*tablet_locations)));
+  }
+  return Status::OK();
+}
+
 Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
                                      TServerDetails* replica,
                                      const string& tablet_id,
@@ -450,6 +495,22 @@ Status FindTabletLeader(const TabletServerMap& tablet_servers,
                                      s.ToString()));
 }
 
+Status FindTabletFollowers(const TabletServerMap& tablet_servers,
+                           const string& tablet_id,
+                           const MonoDelta& timeout,
+                           vector<TServerDetails*>* followers) {
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers, &tservers);
+  TServerDetails* leader;
+  RETURN_NOT_OK(FindTabletLeader(tablet_servers, tablet_id, timeout, &leader));
+  for (TServerDetails* ts : tservers) {
+    if (ts->uuid() != leader->uuid()) {
+      followers->push_back(ts);
+    }
+  }
+  return Status::OK();
+}
+
 Status StartElection(const TServerDetails* replica,
                      const string& tablet_id,
                      const MonoDelta& timeout) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 2d73661..01be8c8 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -156,6 +156,22 @@ Status WaitUntilCommittedConfigOpIdIndexIs(int64_t opid_index,
                                            const std::string& tablet_id,
                                            const MonoDelta& timeout);
 
+enum WaitForLeader {
+  DONT_WAIT_FOR_LEADER = 0,
+  WAIT_FOR_LEADER = 1
+};
+
+// Wait for the specified number of replicas to be reported by the master for
+// the given tablet. Fails when leader is not found or number of replicas
+// did not match up, or timeout waiting for leader.
+Status WaitForReplicasReportedToMaster(
+    const std::shared_ptr<master::MasterServiceProxy>& master_proxy,
+    int num_replicas, const std::string& tablet_id,
+    const MonoDelta& timeout,
+    WaitForLeader wait_for_leader,
+    bool* has_leader,
+    master::TabletLocationsPB* tablet_locations);
+
 // Wait until the last commited OpId has index exactly 'opid_index'.
 Status WaitUntilCommittedOpIdIndexIs(int64_t opid_index,
                                      TServerDetails* replica,
@@ -182,6 +198,12 @@ Status FindTabletLeader(const TabletServerMap& tablet_servers,
                         const MonoDelta& timeout,
                         TServerDetails** leader);
 
+// Grabs list of followers using FindTabletLeader() above.
+Status FindTabletFollowers(const TabletServerMap& tablet_servers,
+                           const string& tablet_id,
+                           const MonoDelta& timeout,
+                           vector<TServerDetails*>* followers);
+
 // Start an election on the specified tserver.
 // 'timeout' only refers to the RPC asking the peer to start an election. The
 // StartElection() RPC does not block waiting for the results of the election,

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index d9ecc5f..579e3f3 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -79,8 +79,6 @@ using itest::RemoveServer;
 using itest::StartElection;
 using itest::WaitUntilLeader;
 using itest::WriteSimpleTestRow;
-using master::GetTabletLocationsRequestPB;
-using master::GetTabletLocationsResponsePB;
 using master::TabletLocationsPB;
 using rpc::RpcController;
 using server::SetFlagRequestPB;
@@ -340,23 +338,6 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
                                                    const string& leader_uuid);
 
-  // Return the replicas of the specified 'tablet_id', as seen by the Master.
-  Status GetTabletLocations(const string& tablet_id, const MonoDelta& timeout,
-                            master::TabletLocationsPB* tablet_locations);
-
-  enum WaitForLeader {
-    NO_WAIT_FOR_LEADER = 0,
-    WAIT_FOR_LEADER = 1
-  };
-
-  // Wait for the specified number of replicas to be reported by the master for
-  // the given tablet. Fails with an assertion if the timeout expires.
-  void WaitForReplicasReportedToMaster(int num_replicas, const string& tablet_id,
-                                       const MonoDelta& timeout,
-                                       WaitForLeader wait_for_leader,
-                                       bool* has_leader,
-                                       master::TabletLocationsPB* tablet_locations);
-
   void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
   void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
   void CreateClusterForCrashyNodesTests();
@@ -1743,60 +1724,6 @@ void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
                                MonoDelta::FromSeconds(10)));
 }
 
-// Return the replicas of the specified 'tablet_id', as seen by the Master.
-Status RaftConsensusITest::GetTabletLocations(const string& tablet_id, const MonoDelta& timeout,
-                                              master::TabletLocationsPB* tablet_locations) {
-  RpcController rpc;
-  rpc.set_timeout(timeout);
-  GetTabletLocationsRequestPB req;
-  *req.add_tablet_ids() = tablet_id;
-  GetTabletLocationsResponsePB resp;
-  RETURN_NOT_OK(cluster_->master_proxy()->GetTabletLocations(req, &resp, &rpc));
-  if (resp.has_error()) {
-    return StatusFromPB(resp.error().status());
-  }
-  if (resp.errors_size() > 0) {
-    CHECK_EQ(1, resp.errors_size()) << SecureShortDebugString(resp);
-    CHECK_EQ(tablet_id, resp.errors(0).tablet_id()) << SecureShortDebugString(resp);
-    return StatusFromPB(resp.errors(0).status());
-  }
-  CHECK_EQ(1, resp.tablet_locations_size()) << SecureShortDebugString(resp);
-  *tablet_locations = resp.tablet_locations(0);
-  return Status::OK();
-}
-
-void RaftConsensusITest::WaitForReplicasReportedToMaster(
-    int num_replicas, const string& tablet_id,
-    const MonoDelta& timeout,
-    WaitForLeader wait_for_leader,
-    bool* has_leader,
-    master::TabletLocationsPB* tablet_locations) {
-  MonoTime deadline(MonoTime::Now() + timeout);
-  while (true) {
-    ASSERT_OK(GetTabletLocations(tablet_id, timeout, tablet_locations));
-    *has_leader = false;
-    if (tablet_locations->replicas_size() == num_replicas) {
-      for (const master::TabletLocationsPB_ReplicaPB& replica :
-                    tablet_locations->replicas()) {
-        if (replica.role() == RaftPeerPB::LEADER) {
-          *has_leader = true;
-        }
-      }
-      if (wait_for_leader == NO_WAIT_FOR_LEADER ||
-          (wait_for_leader == WAIT_FOR_LEADER && *has_leader)) {
-        break;
-      }
-    }
-    if (deadline < MonoTime::Now()) break;
-    SleepFor(MonoDelta::FromMilliseconds(20));
-  }
-  ASSERT_EQ(num_replicas, tablet_locations->replicas_size())
-      << SecureDebugString(*tablet_locations);
-  if (wait_for_leader == WAIT_FOR_LEADER) {
-    ASSERT_TRUE(*has_leader) << SecureDebugString(*tablet_locations);
-  }
-}
-
 // Basic test of adding and removing servers from a configuration.
 TEST_F(RaftConsensusITest, TestAddRemoveServer) {
   MonoDelta kTimeout = MonoDelta::FromSeconds(10);
@@ -2264,8 +2191,9 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, WAIT_FOR_LEADER,
-                                            &has_leader, &tablet_locations));
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   2, tablet_id, timeout, itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
@@ -2283,8 +2211,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
   // Wait for the master to be notified of the config change.
   // It should continue to have the same leader, even without waiting.
   LOG(INFO) << "Waiting for Master to see config change...";
-  NO_FATALS(WaitForReplicasReportedToMaster(3, tablet_id, timeout, NO_WAIT_FOR_LEADER,
-                                            &has_leader, &tablet_locations));
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id, timeout,
+                                                   itest::DONT_WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 
@@ -2297,8 +2227,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
 
   // Wait for the master to be notified of the removal.
   LOG(INFO) << "Waiting for Master to see config change...";
-  NO_FATALS(WaitForReplicasReportedToMaster(2, tablet_id, timeout, NO_WAIT_FOR_LEADER,
-                                            &has_leader, &tablet_locations));
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   2, tablet_id, timeout,
+                                                   itest::DONT_WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index 1a34718..a414c25 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -21,8 +21,10 @@
 #include <gtest/gtest.h>
 
 #include "kudu/client/client.h"
+#include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/integration-tests/ts_itest-base.h"
 #include "kudu/tools/tool_test_util.h"
@@ -38,6 +40,7 @@ using client::KuduSchema;
 using client::KuduTableCreator;
 using client::sp::shared_ptr;
 using consensus::ConsensusStatePB;
+using consensus::OpId;
 using itest::TabletServerMap;
 using itest::TServerDetails;
 using std::string;
@@ -155,6 +158,793 @@ TEST_F(AdminCliTest, TestChangeConfig) {
                                                 MonoDelta::FromSeconds(10)));
 }
 
+Status RunUnsafeChangeConfig(const string& tablet_id,
+                             const string& dst_host,
+                             vector<string> peer_uuid_list) {
+  vector<string> command_args = { GetKuduCtlAbsolutePath(), "remote_replica",
+      "unsafe_change_config", dst_host, tablet_id };
+  for (const auto& peer_uuid : peer_uuid_list) {
+    command_args.push_back(peer_uuid);
+  }
+  return Subprocess::Call(command_args);
+}
+
+// Test unsafe config change when there is one follower survivor in the cluster.
+// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down leader and follower1.
+// 3. Trigger unsafe config change on follower2 having follower2 in the config.
+// 4. Wait until the new config is populated on follower2(new leader) and master.
+// 5. Bring up leader and follower1 and verify replicas are deleted.
+// 6. Verify that new config doesn't contain old leader and follower1.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  // tserver_unresponsive_timeout_ms is useful so that master considers
+  // the live tservers for tablet re-replication.
+  NO_FATALS(BuildAndStart({}, {}));
+
+  LOG(INFO) << "Finding tablet leader and waiting for things to start...";
+  string tablet_id = tablet_replicas_.begin()->first;
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+
+  // Replace the config on follower1 after shutting down follower2 and leader.
+  cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid();
+  const string& follower1_addr = Substitute("$0:$1",
+                                            followers[0]->registration.rpc_addresses(0).host(),
+                                            followers[0]->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(followers[0]->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
+
+  // Check that new config is populated to a new follower.
+  vector<TServerDetails*> all_tservers;
+  TServerDetails *new_follower = nullptr;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  for (const auto& ts :all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_follower = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_follower != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 3 voters on follower1.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+
+  // Verify that two new servers are part of new config and old
+  // servers are gone.
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
+  }
+
+  // Also verify that when we bring back followers[1] and leader,
+  // we should see the tablet in TOMBSTONED state on these servers.
+  ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
+  ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
+  ASSERT_OK(itest::WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
+  ASSERT_OK(itest::WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
+}
+
+// Test unsafe config change when there is one leader survivor in the cluster.
+// 1. Instantiate external mini cluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down both followers.
+// 3. Trigger unsafe config change on leader having leader in the config.
+// 4. Wait until the new config is populated on leader and master.
+// 5. Verify that new config does not contain old followers.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart({}, {}));
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+
+  // Shut down servers follower1 and follower2,
+  // so that we can force new config on remaining leader.
+  cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  LOG(INFO) << "Forcing unsafe config change on tserver " << leader_ts->uuid();
+  const string& leader_addr = Substitute("$0:$1",
+                                         leader_ts->registration.rpc_addresses(0).host(),
+                                         leader_ts->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(leader_ts->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
+
+  // Check that new config is populated to a new follower.
+  vector<TServerDetails*> all_tservers;
+  TServerDetails *new_follower = nullptr;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  for (const auto& ts :all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_follower = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_follower != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 3 voters on new_follower.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
+  }
+}
+
+// Test unsafe config change when the unsafe config contains 2 nodes.
+// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down leader.
+// 3. Trigger unsafe config change on follower1 having follower1 and follower2 in the config.
+// 4. Wait until the new config is populated on new_leader and master.
+// 5. Verify that new config does not contain old leader.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 4;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart({}, {}));
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+
+  // Shut down leader and prepare 2-node config.
+  cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
+  const string& follower1_addr = Substitute("$0:$1",
+                                            followers[1]->registration.rpc_addresses(0).host(),
+                                            followers[1]->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(followers[0]->uuid());
+  peer_uuid_list.push_back(followers[1]->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 3 voters on follower1.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
+  }
+}
+
+// Test unsafe config change on a 5-replica tablet when the unsafe config contains 2 nodes.
+// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 8 TS.
+// 2. Shut down leader and 2 followers.
+// 3. Trigger unsafe config change on a surviving follower with those
+//    2 surviving followers in the new config.
+// 4. Wait until the new config is populated on new_leader and master.
+// 5. Verify that new config does not contain old leader and old followers.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 8;
+  FLAGS_num_replicas = 5;
+  // Retire the dead servers early with these settings.
+  NO_FATALS(BuildAndStart({}, {}));
+
+  vector<TServerDetails*> tservers;
+  vector<ExternalTabletServer*> external_tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  for (TServerDetails* ts : tservers) {
+    external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
+  }
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   5, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+  ASSERT_EQ(followers.size(), 4);
+  cluster_->tablet_server_by_uuid(followers[2]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[3]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
+  const string& follower1_addr = Substitute("$0:$1",
+                                         followers[1]->registration.rpc_addresses(0).host(),
+                                         followers[1]->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(followers[0]->uuid());
+  peer_uuid_list.push_back(followers[1]->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 5 voters back on new_node.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   5, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), leader_ts->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[2]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[3]->uuid());
+  }
+}
+
+// Test unsafe config change when there is a pending config on a surviving leader.
+// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down both the followers.
+// 3. Trigger a regular config change on the leader which remains pending on leader.
+// 4. Trigger unsafe config change on the surviving leader.
+// 5. Wait until the new config is populated on leader and master.
+// 6. Verify that new config does not contain old followers and a standby node
+//    has populated the new config.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart({}, {}));
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+  ASSERT_EQ(followers.size(), 2);
+
+  // Shut down servers follower1 and follower2,
+  // so that leader can't replicate future config change ops.
+  cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
+
+  // Now try to replicate a ChangeConfig operation. This should get stuck and time out
+  // because the server can't replicate any operations.
+  Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
+                          -1, MonoDelta::FromSeconds(2),
+                          nullptr);
+  ASSERT_TRUE(s.IsTimedOut());
+
+  LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
+            << "command when change config op is pending on the leader.";
+  const string& leader_addr = Substitute("$0:$1",
+                                         leader_ts->registration.rpc_addresses(0).host(),
+                                         leader_ts->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(leader_ts->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
+
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 3 voters on new_node.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
+  }
+}
+
+// Test unsafe config change when there is a pending config on a surviving follower.
+// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down both the followers.
+// 3. Trigger a regular config change on the leader which remains pending on leader.
+// 4. Trigger a leader_step_down command such that leader is forced to become follower.
+// 5. Trigger unsafe config change on the follower.
+// 6. Wait until the new config is populated on leader and master.
+// 7. Verify that new config does not contain old followers and a standby node
+//    has populated the new config.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart({}, {}));
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+
+  // Shut down servers follower1 and follower2,
+  // so that leader can't replicate future config change ops.
+  cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
+  // Restart master to cleanup cache of dead servers from its
+  // list of candidate servers to place the new replicas.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  // Now try to replicate a ChangeConfig operation. This should get stuck and time out
+  // because the server can't replicate any operations.
+  Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
+                          -1, MonoDelta::FromSeconds(2),
+                          nullptr);
+  ASSERT_TRUE(s.IsTimedOut());
+
+  // Force leader to step down, best effort command since the leadership
+  // could change anytime during cluster lifetime.
+  string stderr;
+  s = Subprocess::Call({GetKuduCtlAbsolutePath(), "tablet", "leader_step_down",
+                        cluster_->master()->bound_rpc_addr().ToString(),
+                        tablet_id_},
+                       "", nullptr, &stderr);
+  bool not_currently_leader = stderr.find(
+      Status::IllegalState("").CodeAsString()) != string::npos;
+  ASSERT_TRUE(s.ok() || not_currently_leader);
+
+  LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
+            << "command when change config op is pending on the leader.";
+  const string& leader_addr = Substitute("$0:$1",
+                                         leader_ts->registration.rpc_addresses(0).host(),
+                                         leader_ts->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(leader_ts->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 3 voters on new_node.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
+  }
+}
+
+// Test unsafe config change when there are back to back pending configs on leader logs.
+// 1. Instantiate external minicluster with 1 tablet having 3 replicas and 5 TS.
+// 2. Shut down both the followers.
+// 3. Trigger a regular config change on the leader which remains pending on leader.
+// 4. Set a fault crash flag to trigger upon next commit of config change.
+// 5. Trigger unsafe config change on the surviving leader which should trigger
+//    the fault while the old config change is being committed.
+// 6. Shutdown and restart the leader and verify that tablet bootstrapped on leader.
+// 7. Verify that a new node has populated the new config with 3 voters.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart({}, {}));
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+
+  // Shut down servers follower1 and follower2,
+  // so that leader can't replicate future config change ops.
+  cluster_->tablet_server_by_uuid(followers[0]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
+
+  // Now try to replicate a ChangeConfig operation. This should get stuck and time out
+  // because the server can't replicate any operations.
+  Status s = RemoveServer(leader_ts, tablet_id_, followers[1],
+                          -1, MonoDelta::FromSeconds(2),
+                          nullptr);
+  ASSERT_TRUE(s.IsTimedOut());
+
+  LOG(INFO) << "Change Config Op timed out, Sending a Replace config "
+            << "command when change config op is pending on the leader.";
+  const string& leader_addr = Substitute("$0:$1",
+                                         leader_ts->registration.rpc_addresses(0).host(),
+                                         leader_ts->registration.rpc_addresses(0).port());
+  vector<string> peer_uuid_list;
+  peer_uuid_list.push_back(leader_ts->uuid());
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
+
+  // Inject the crash via fault_crash_before_cmeta_flush flag.
+  // Tablet will find 2 pending configs back to back during bootstrap,
+  // one from ChangeConfig (RemoveServer) and another from UnsafeChangeConfig.
+  ASSERT_OK(cluster_->SetFlag(
+      cluster_->tablet_server_by_uuid(leader_ts->uuid()),
+      "fault_crash_before_cmeta_flush", "1.0"));
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+  // Restart master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers.
+  cluster_->master()->Shutdown();
+  ASSERT_OK(cluster_->master()->Restart());
+
+  ASSERT_OK(cluster_->tablet_server_by_uuid(
+      leader_ts->uuid())->WaitForInjectedCrash(kTimeout));
+
+  cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
+  ASSERT_OK(cluster_->tablet_server_by_uuid(
+      leader_ts->uuid())->Restart());
+  ASSERT_OK(WaitForNumTabletsOnTS(leader_ts, 1, kTimeout, nullptr));
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   3, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[0]->uuid());
+    ASSERT_NE(replica.ts_info().permanent_uuid(), followers[1]->uuid());
+  }
+}
+
+// Test unsafe config change on a 5-replica tablet when the mulitple pending configs
+// on the surviving node.
+// 1. Instantiate external minicluster with 1 tablet having 5 replicas and 9 TS.
+// 2. Shut down all the followers.
+// 3. Trigger unsafe config changes on the surviving leader with those
+//    dead followers in the new config.
+// 4. Wait until the new config is populated on the master and the new leader.
+// 5. Verify that new config does not contain old followers.
+TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  FLAGS_num_tablet_servers = 9;
+  FLAGS_num_replicas = 5;
+  // Retire the dead servers early with these settings.
+  NO_FATALS(BuildAndStart({}, {}));
+
+  vector<TServerDetails*> tservers;
+  vector<ExternalTabletServer*> external_tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  for (TServerDetails* ts : tservers) {
+    external_tservers.push_back(cluster_->tablet_server_by_uuid(ts->uuid()));
+  }
+
+  // Determine the list of tablet servers currently in the config.
+  TabletServerMap active_tablet_servers;
+  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  for (auto it = iter.first; it != iter.second; ++it) {
+    InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
+  }
+
+  // Get a baseline config reported to the master.
+  LOG(INFO) << "Waiting for Master to see the current replicas...";
+  master::TabletLocationsPB tablet_locations;
+  bool has_leader;
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   5, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
+
+  // Wait for initial NO_OP to be committed by the leader.
+  TServerDetails* leader_ts;
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
+  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  vector<TServerDetails*> followers;
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+  ASSERT_EQ(followers.size(), 4);
+  for (int i = 0; i < followers.size(); i++) {
+    cluster_->tablet_server_by_uuid(followers[i]->uuid())->Shutdown();
+  }
+  // Shutdown master to cleanup cache of dead servers from its list of candidate
+  // servers to trigger placement of new replicas on healthy servers when we restart later.
+  cluster_->master()->Shutdown();
+
+  LOG(INFO) << "Forcing unsafe config change on tserver " << followers[1]->uuid();
+  const string& leader_addr = Substitute("$0:$1",
+                                         leader_ts->registration.rpc_addresses(0).host(),
+                                         leader_ts->registration.rpc_addresses(0).port());
+
+  // This should keep the multiple pending configs on the node since we are
+  // adding all the dead followers to the new config, and then eventually we write
+  // just one surviving node to the config.
+  // New config write sequences are: {ABCDE}, {ABCD}, {ABC}, {AB}, {A},
+  // A being the leader node where config is written and rest of the nodes are
+  // dead followers.
+  for (int num_replicas = followers.size(); num_replicas >= 0; num_replicas--) {
+    vector<string> peer_uuid_list;
+    peer_uuid_list.push_back(leader_ts->uuid());
+    for (int i = 0; i < num_replicas; i++) {
+      peer_uuid_list.push_back(followers[i]->uuid());
+    }
+    ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, leader_addr, peer_uuid_list));
+  }
+
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(cluster_->master()->Restart());
+
+  // Find a remaining node which will be picked for re-replication.
+  vector<TServerDetails*> all_tservers;
+  AppendValuesFromMap(tablet_servers_, &all_tservers);
+  TServerDetails* new_node = nullptr;
+  for (TServerDetails* ts : all_tservers) {
+    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
+      new_node = ts;
+      break;
+    }
+  }
+  ASSERT_TRUE(new_node != nullptr);
+
+  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
+  // so it is safer to wait until consensus metadata has 5 voters on new_node.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(5, new_node, tablet_id_, kTimeout));
+
+  // Wait for the master to be notified of the config change.
+  LOG(INFO) << "Waiting for Master to see new config...";
+  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                                   5, tablet_id_, kTimeout,
+                                                   itest::WAIT_FOR_LEADER,
+                                                   &has_leader, &tablet_locations));
+  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  for (const master::TabletLocationsPB_ReplicaPB& replica :
+      tablet_locations.replicas()) {
+    // Verify that old followers aren't part of new config.
+    for (const auto& old_follower : followers) {
+      ASSERT_NE(replica.ts_info().permanent_uuid(), old_follower->uuid());
+    }
+  }
+}
+
 Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
                             const string& tablet_id,
                             int64 *current_term) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index e6c241a..68c4493 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -378,10 +378,11 @@ TEST_F(ToolTest, TestModeHelp) {
   {
     const vector<string> kRemoteReplicaModeRegexes = {
         "check.*Check if all tablet replicas",
-        "copy.*Copy a tablet replica from one Kudu Tablet Server to another",
+        "copy.*Copy a tablet replica from one Kudu Tablet Server",
         "delete.*Delete a tablet replica",
         "dump.*Dump the data of a tablet replica",
-        "list.*List all tablet replicas"
+        "list.*List all tablet replicas",
+        "unsafe_change_config.*Force the specified replica to adopt"
     };
     NO_FATALS(RunTestHelp("remote_replica", kRemoteReplicaModeRegexes));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/80026a34/src/kudu/tools/tool_action_remote_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index a7435ed..9eb1674 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -31,6 +31,7 @@
 #include "kudu/client/scanner-internal.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/common/partition.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
@@ -61,6 +62,8 @@ using client::KuduRowResult;
 using client::KuduScanBatch;
 using client::KuduSchema;
 using consensus::ConsensusServiceProxy;
+using consensus::RaftConfigPB;
+using consensus::RaftPeerPB;
 using consensus::StartTabletCopyRequestPB;
 using consensus::StartTabletCopyResponsePB;
 using rpc::RpcController;
@@ -155,6 +158,8 @@ const char* const kTServerAddressDesc = "Address of a Kudu Tablet Server of "
     "to the default port.";
 const char* const kSrcAddressArg = "src_address";
 const char* const kDstAddressArg = "dst_address";
+const char* const kPeerUUIDsArg = "peer uuids";
+const char* const kPeerUUIDsArgDesc = "List of peer uuids to be part of new config";
 
 Status GetReplicas(TabletServerServiceProxy* proxy,
                    vector<ListTabletsResponsePB::StatusAndSchemaPB>* replicas) {
@@ -338,6 +343,43 @@ Status CopyReplica(const RunnerContext& context) {
   return Status::OK();
 }
 
+Status UnsafeChangeConfig(const RunnerContext& context) {
+  // Parse and validate arguments.
+  const string& dst_address = FindOrDie(context.required_args, kTServerAddressArg);
+  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+  ServerStatusPB dst_status;
+  RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
+                                &dst_status));
+
+  if (context.variadic_args.empty()) {
+    return Status::InvalidArgument("No peer UUIDs specified for the new config");
+  }
+
+  RaftConfigPB new_config;
+  for (const auto& arg : context.variadic_args) {
+    RaftPeerPB new_peer;
+    new_peer.set_permanent_uuid(arg);
+    new_config.add_peers()->CopyFrom(new_peer);
+  }
+
+  // Send a request to replace the config to node dst_address.
+  unique_ptr<ConsensusServiceProxy> proxy;
+  RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
+  consensus::UnsafeChangeConfigRequestPB req;
+  consensus::UnsafeChangeConfigResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
+  req.set_dest_uuid(dst_status.node_instance().permanent_uuid());
+  req.set_tablet_id(tablet_id);
+  req.set_caller_id("kudu-tools");
+  *req.mutable_new_config() = new_config;
+  RETURN_NOT_OK(proxy->UnsafeChangeConfig(req, &resp, &rpc));
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 unique_ptr<Mode> BuildRemoteReplicaMode() {
@@ -377,6 +419,17 @@ unique_ptr<Mode> BuildRemoteReplicaMode() {
       .AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
       .Build();
 
+  unique_ptr<Action> unsafe_change_config =
+      ActionBuilder("unsafe_change_config", &UnsafeChangeConfig)
+      .Description("Force the specified replica to adopt a new Raft config")
+      .ExtraDescription("The members of the new Raft config must be a subset "
+                        "of (or the same as) the members of the existing "
+                        "committed Raft config on that replica.")
+      .AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
+      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddRequiredVariadicParameter({ kPeerUUIDsArg, kPeerUUIDsArgDesc })
+      .Build();
+
   return ModeBuilder("remote_replica")
       .Description("Operate on remote tablet replicas on a Kudu Tablet Server")
       .AddAction(std::move(check_replicas))
@@ -384,6 +437,7 @@ unique_ptr<Mode> BuildRemoteReplicaMode() {
       .AddAction(std::move(delete_replica))
       .AddAction(std::move(dump_replica))
       .AddAction(std::move(list))
+      .AddAction(std::move(unsafe_change_config))
       .Build();
 }