You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/11/29 07:21:22 UTC

[2/3] kudu git commit: KUDU-1097 (patch 5a): Implement a bulk config change API

KUDU-1097 (patch 5a): Implement a bulk config change API

A "bulk" config change API is required to support simultaneously
modifying attributes on more than one peer in a config, such as when we
want to move a replica from one location to another.

The "traditional" config change API has been re-routed through the bulk
API so that we get some basic test coverage from the existing tests.

In addition to adding the bulk API, the following changes were made to
the MODIFY_PEER config change API which is currently unused:

 * The 'member_type' field is no longer required to be modified by a
   MODIFY_PEER config change operation, but *something* still must be
   modified to allow it to go through. This is enforced by checking the
   "before" and "after" RaftPeerPB instances with MessageDifferencer.
 * We now allow to modifying the leader replica's attributes (but still
   not its 'member_type' field).

A functional test was added to verify the new functionality of the bulk
change API.

Change-Id: I928a1622d48049c9ad3223b76549a2822bccc30c
Reviewed-on: http://gerrit.cloudera.org:8080/8644
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/eb0718e1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/eb0718e1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/eb0718e1

Branch: refs/heads/master
Commit: eb0718e16895b4f851d232f26662b6c16ec51b2f
Parents: 3f65862
Author: Mike Percy <mp...@apache.org>
Authored: Sun Nov 26 23:31:00 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Nov 29 07:20:45 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus.proto              |  30 +++
 src/kudu/consensus/raft_consensus.cc            | 251 ++++++++++++-------
 src/kudu/consensus/raft_consensus.h             |   5 +
 .../integration-tests/cluster_itest_util.cc     |  55 ++++
 src/kudu/integration-tests/cluster_itest_util.h |  17 ++
 .../raft_config_change-itest.cc                 | 229 +++++++++++++++++
 .../raft_consensus_nonvoter-itest.cc            |   9 +-
 src/kudu/tserver/tablet_service.cc              |  25 ++
 src/kudu/tserver/tablet_service.h               |   5 +
 9 files changed, 537 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 6ff4186..1c7f56c 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -151,6 +151,30 @@ message ChangeConfigRequestPB {
   optional int64 cas_config_opid_index = 5;
 }
 
+// A config change request that specifies many things to change.
+message BulkChangeConfigRequestPB {
+  // Each sub-request that is being made as part of this bulk change config request.
+  message ConfigChangeItemPB {
+    // The type of config change.
+    optional ChangeConfigType type = 1;
+    // The peer to add, remove, or modify.
+    // The same rules apply here as above in ChangeConfigRequestPB.
+    optional RaftPeerPB peer = 2;
+  }
+
+  // UUID of server this request is addressed to.
+  optional bytes dest_uuid = 1;
+
+  required bytes tablet_id = 2;
+
+  repeated ConfigChangeItemPB config_changes = 3;
+
+  // The OpId index of the committed config to replace.
+  // This parameter serves the same purpose as specified in
+  // ChangeConfigRequestPB.
+  optional int64 cas_config_opid_index = 4;
+}
+
 // The configuration change response. If any immediate error occurred
 // the 'error' field is set with it, otherwise 'new_configuration' is set.
 message ChangeConfigResponsePB {
@@ -528,6 +552,12 @@ service ConsensusService {
   // An OK response means the operation was successful.
   rpc ChangeConfig(ChangeConfigRequestPB) returns (ChangeConfigResponsePB);
 
+  // Implements a one-by-one config change interface that allows for multiple
+  // peers to be modified at once, including setting attributes and adding or
+  // removing various peers, as long not more than one voter is added, removed,
+  // or demoted in a single operation.
+  rpc BulkChangeConfig(BulkChangeConfigRequestPB) returns (ChangeConfigResponsePB);
+
   // Implements unsafe config change operation for manual recovery use cases.
   rpc UnsafeChangeConfig(UnsafeChangeConfigRequestPB) returns (UnsafeChangeConfigResponsePB);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cdfb0fb..291c146 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -31,6 +31,7 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
+#include <google/protobuf/util/message_differencer.h>
 
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
@@ -144,12 +145,14 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
                           "each time a leader election is started.");
 
 using boost::optional;
+using google::protobuf::util::MessageDifferencer;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::PeriodicTimer;
 using kudu::tserver::TabletServerErrorPB;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::weak_ptr;
 using strings::Substitute;
 
@@ -1635,17 +1638,32 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
-  if (PREDICT_FALSE(!req.has_type())) {
-    return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()",
-                                   SecureShortDebugString(req));
+  BulkChangeConfigRequestPB bulk_req;
+  *bulk_req.mutable_tablet_id() = req.tablet_id();
+
+  if (req.has_dest_uuid()) {
+    *bulk_req.mutable_dest_uuid() = req.dest_uuid();
   }
-  if (PREDICT_FALSE(!req.has_server())) {
-    *error_code = TabletServerErrorPB::INVALID_CONFIG;
-    return Status::InvalidArgument("Must specify 'server' argument to ChangeConfig()",
-                                   SecureShortDebugString(req));
+  if (req.has_cas_config_opid_index()) {
+    bulk_req.set_cas_config_opid_index(req.cas_config_opid_index());
+  }
+  auto* change = bulk_req.add_config_changes();
+  if (req.has_type()) {
+    change->set_type(req.type());
+  }
+  if (req.has_server()) {
+    *change->mutable_peer() = req.server();
   }
-  ChangeConfigType type = req.type();
-  const RaftPeerPB& server = req.server();
+
+  return BulkChangeConfig(bulk_req, std::move(client_cb), error_code);
+}
+
+Status RaftConsensus::BulkChangeConfig(const BulkChangeConfigRequestPB& req,
+                                       StdStatusCallback client_cb,
+                                       boost::optional<TabletServerErrorPB::Code>* error_code) {
+  TRACE_EVENT2("consensus", "RaftConsensus::BulkChangeConfig",
+               "peer", peer_uuid(),
+               "tablet", options_.tablet_id);
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
@@ -1660,11 +1678,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
       return Status::IllegalState("Leader has not yet committed an operation in its own term");
     }
 
-    if (!server.has_permanent_uuid()) {
-      return Status::InvalidArgument("server must have permanent_uuid specified",
-                                     SecureShortDebugString(req));
-    }
-    RaftConfigPB committed_config = cmeta_->CommittedConfig();
+    const RaftConfigPB committed_config = cmeta_->CommittedConfig();
 
     // Support atomic ChangeConfig requests.
     if (req.has_cas_config_opid_index()) {
@@ -1678,82 +1692,151 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
       }
     }
 
+    // 'new_config' will be modified in-place and validated before being used
+    // as the new Raft configuration.
     RaftConfigPB new_config = committed_config;
-    new_config.clear_opid_index();
-    const string& server_uuid = server.permanent_uuid();
-    switch (type) {
-      case ADD_PEER:
-        // Ensure the server we are adding is not already a member of the configuration.
-        if (IsRaftConfigMember(server_uuid, committed_config)) {
-          return Status::InvalidArgument(
-              Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
-                        server_uuid, SecureShortDebugString(committed_config)));
-        }
-        if (!server.has_member_type()) {
-          return Status::InvalidArgument("server must have member_type specified",
-                                         SecureShortDebugString(req));
-        }
-        if (!server.has_last_known_addr()) {
-          return Status::InvalidArgument("server must have last_known_addr specified",
-                                         SecureShortDebugString(req));
-        }
-        *new_config.add_peers() = server;
-        break;
 
-      case REMOVE_PEER:
-        if (server_uuid == peer_uuid()) {
-          return Status::InvalidArgument(
-              Substitute("Cannot remove peer $0 from the config because it is the leader. "
-                         "Force another leader to be elected to remove this server. "
-                         "Consensus state: $1",
-                         server_uuid,
-                         SecureShortDebugString(cmeta_->ToConsensusStatePB())));
-        }
-        if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
-          return Status::NotFound(
-              Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
-                        server_uuid, SecureShortDebugString(committed_config)));
-        }
-        break;
+    // Enforce the "one by one" config change rules, even with the bulk API.
+    // Keep track of total voters added, including non-voters promoted to
+    // voters, and removed, including voters demoted to non-voters.
+    int num_voters_modified = 0;
 
-      case MODIFY_PEER:
-        if (server.member_type() == RaftPeerPB::UNKNOWN_MEMBER_TYPE) {
-          return Status::InvalidArgument("Cannot change replica type to UNKNOWN_MEMBER_TYPE");
-        }
-        if (server_uuid == peer_uuid()) {
-          return Status::InvalidArgument(
-              Substitute("Cannot modify peer $0 because it is the leader. "
-                         "Force another leader to be elected to modify this replica. "
-                         "Consensus state: $1",
-                         server_uuid,
-                         SecureShortDebugString(cmeta_->ToConsensusStatePB())));
-        }
-        RaftPeerPB* peer_pb;
-        RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &peer_pb));
-        // Validate the changes.
-        if (ReplicaTypesEqual(*peer_pb, server)) {
-          return Status::InvalidArgument("Cannot change replica type to same type");
-        }
-        peer_pb->set_member_type(server.member_type());
-        // Override attributes only if they are explicitly passed in the request.
-        // TODO(mpercy): It seems cleaner to bulk-overwrite 'attrs' with
-        // whatever is passed into the request, but that would make it more
-        // complicated to correctly handle a non-voter that had both its
-        // "promote" and "replace" flags set.
-        if (server.attrs().has_promote()) {
-          peer_pb->mutable_attrs()->set_promote(server.attrs().promote());
-        }
-        if (server.attrs().has_replace()) {
-          peer_pb->mutable_attrs()->set_replace(server.attrs().replace());
+    // A record of the peers being modified so that we can enforce only one
+    // change per peer per request.
+    unordered_set<string> peers_modified;
+
+    for (const auto& item : req.config_changes()) {
+      if (PREDICT_FALSE(!item.has_type())) {
+        *error_code = TabletServerErrorPB::INVALID_CONFIG;
+        return Status::InvalidArgument("Must specify 'type' argument",
+                                       SecureShortDebugString(req));
+      }
+      if (PREDICT_FALSE(!item.has_peer())) {
+        *error_code = TabletServerErrorPB::INVALID_CONFIG;
+        return Status::InvalidArgument("Must specify 'peer' argument",
+                                       SecureShortDebugString(req));
+      }
+
+      ChangeConfigType type = item.type();
+      const RaftPeerPB& peer = item.peer();
+
+      if (PREDICT_FALSE(!peer.has_permanent_uuid())) {
+        return Status::InvalidArgument("peer must have permanent_uuid specified",
+                                       SecureShortDebugString(req));
+      }
+
+      if (!InsertIfNotPresent(&peers_modified, peer.permanent_uuid())) {
+        return Status::InvalidArgument(
+            Substitute("only one change allowed per peer: peer $0 appears more "
+                       "than once in the config change request",
+                       peer.permanent_uuid()),
+            SecureShortDebugString(req));
+      }
+
+      const string& server_uuid = peer.permanent_uuid();
+      switch (type) {
+        case ADD_PEER:
+          // Ensure the peer we are adding is not already a member of the configuration.
+          if (IsRaftConfigMember(server_uuid, committed_config)) {
+            return Status::InvalidArgument(
+                Substitute("Server with UUID $0 is already a member of the config. RaftConfig: $1",
+                           server_uuid, SecureShortDebugString(committed_config)));
+          }
+          if (!peer.has_member_type()) {
+            return Status::InvalidArgument("peer must have member_type specified",
+                                           SecureShortDebugString(req));
+          }
+          if (!peer.has_last_known_addr()) {
+            return Status::InvalidArgument("peer must have last_known_addr specified",
+                                           SecureShortDebugString(req));
+          }
+          if (peer.member_type() == RaftPeerPB::VOTER) {
+            num_voters_modified++;
+          }
+          *new_config.add_peers() = peer;
+          break;
+
+        case REMOVE_PEER:
+          if (server_uuid == peer_uuid()) {
+            return Status::InvalidArgument(
+                Substitute("Cannot remove peer $0 from the config because it is the leader. "
+                           "Force another leader to be elected to remove this peer. "
+                           "Consensus state: $1",
+                           server_uuid,
+                           SecureShortDebugString(cmeta_->ToConsensusStatePB())));
+          }
+          if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
+            return Status::NotFound(
+                Substitute("Server with UUID $0 not a member of the config. RaftConfig: $1",
+                           server_uuid, SecureShortDebugString(committed_config)));
+          }
+          if (IsRaftConfigVoter(server_uuid, committed_config)) {
+            num_voters_modified++;
+          }
+          break;
+
+        case MODIFY_PEER: {
+          RaftPeerPB* modified_peer;
+          RETURN_NOT_OK(GetRaftConfigMember(&new_config, server_uuid, &modified_peer));
+          const RaftPeerPB orig_peer(*modified_peer);
+          // Override 'member_type' and items within 'attrs' only if they are
+          // explicitly passed in the request. At least one field must be
+          // modified to be a valid request.
+          if (peer.has_member_type() && peer.member_type() != modified_peer->member_type()) {
+            if (modified_peer->member_type() == RaftPeerPB::VOTER ||
+                peer.member_type() == RaftPeerPB::VOTER) {
+              // This is a 'member_type' change involving a VOTER, i.e. a
+              // promotion or demotion.
+              num_voters_modified++;
+            }
+            // A leader must be forced to step down before demoting it.
+            if (server_uuid == peer_uuid()) {
+              return Status::InvalidArgument(
+                  Substitute("Cannot modify member type of peer $0 because it is the leader. "
+                              "Cause another leader to be elected to modify this peer. "
+                              "Consensus state: $1",
+                              server_uuid,
+                              SecureShortDebugString(cmeta_->ToConsensusStatePB())));
+            }
+            modified_peer->set_member_type(peer.member_type());
+          }
+          if (peer.attrs().has_promote()) {
+            modified_peer->mutable_attrs()->set_promote(peer.attrs().promote());
+          }
+          if (peer.attrs().has_replace()) {
+            modified_peer->mutable_attrs()->set_replace(peer.attrs().replace());
+          }
+          // Ensure that MODIFY_PEER actually modified something.
+          if (MessageDifferencer::Equals(orig_peer, *modified_peer)) {
+            return Status::InvalidArgument("must modify a field when calling MODIFY_PEER");
+          }
+          break;
         }
-        break;
 
-      default:
-        return Status::NotSupported(Substitute(
-            "$0: unsupported type of configuration change",
-            ChangeConfigType_Name(type)));
+        default:
+          return Status::NotSupported(Substitute(
+              "$0: unsupported type of configuration change",
+              ChangeConfigType_Name(type)));
+      }
+    }
+
+    // Don't allow no-op config changes to be committed.
+    if (MessageDifferencer::Equals(committed_config, new_config)) {
+      return Status::InvalidArgument("requested configuration change does not "
+                                     "actually modify the config",
+                                     SecureShortDebugString(req));
     }
 
+    // Ensure this wasn't an illegal bulk change.
+    if (num_voters_modified > 1) {
+      return Status::InvalidArgument("it is not safe to modify the VOTER status "
+                                     "of more than one peer at a time",
+                                     SecureShortDebugString(req));
+    }
+
+    // We'll assign a new opid_index to this config change.
+    new_config.clear_opid_index();
+
     RETURN_NOT_OK(ReplicateConfigChangeUnlocked(
         std::move(committed_config), std::move(new_config), std::bind(
             &RaftConsensus::MarkDirtyOnSuccess,
@@ -1761,7 +1844,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
             string("Config change replication complete"),
             std::move(client_cb),
             std::placeholders::_1)));
-  }
+  } // Release lock before signaling request.
   peer_manager_->SignalRequest();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 3bdb53a..2c438d1 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -252,6 +252,11 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                       StdStatusCallback client_cb,
                       boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
 
+  // Implement a BulkChangeConfig() request.
+  Status BulkChangeConfig(const BulkChangeConfigRequestPB& req,
+                          StdStatusCallback client_cb,
+                          boost::optional<tserver::TabletServerErrorPB::Code>* error_code);
+
   // Implement an UnsafeChangeConfig() request.
   Status UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
                             tserver::TabletServerErrorPB::Code* error_code);

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index ddbe798..757f6d0 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -64,6 +64,7 @@ namespace itest {
 
 using client::KuduSchema;
 using client::KuduSchemaBuilder;
+using consensus::BulkChangeConfigRequestPB;
 using consensus::ChangeConfigRequestPB;
 using consensus::ChangeConfigResponsePB;
 using consensus::ConsensusStatePB;
@@ -340,6 +341,31 @@ Status GetConsensusState(const TServerDetails* replica,
   return Status::OK();
 }
 
+Status WaitUntilNoPendingConfig(const TServerDetails* replica,
+                                const std::string& tablet_id,
+                                const MonoDelta& timeout,
+                                consensus::ConsensusStatePB* cstate) {
+  ConsensusStatePB cstate_tmp;
+  MonoTime start = MonoTime::Now();
+  MonoTime deadline = start + timeout;
+  MonoTime now;
+  Status s;
+  while ((now = MonoTime::Now()) < deadline) {
+    s = GetConsensusState(replica, tablet_id, deadline - now, &cstate_tmp);
+    if (s.ok() && !cstate_tmp.has_pending_config()) {
+      if (cstate) {
+        *cstate = std::move(cstate_tmp);
+      }
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(30));
+  }
+  return Status::TimedOut(Substitute("There is still a pending config after waiting for $0. "
+                                     "Last consensus state: $1. Last status: $2",
+                                     (MonoTime::Now() - start).ToString(),
+                                     SecureShortDebugString(cstate_tmp), s.ToString()));
+}
+
 Status WaitUntilCommittedConfigNumVotersIs(int config_size,
                                            const TServerDetails* replica,
                                            const std::string& tablet_id,
@@ -780,6 +806,35 @@ Status ChangeReplicaType(const TServerDetails* leader,
   return Status::OK();
 }
 
+Status BulkChangeConfig(const TServerDetails* leader,
+                        const std::string& tablet_id,
+                        const vector<BulkChangeConfigRequestPB::ConfigChangeItemPB>& changes,
+                        const MonoDelta& timeout,
+                        const boost::optional<int64_t>& cas_config_index,
+                        tserver::TabletServerErrorPB::Code* error_code) {
+  BulkChangeConfigRequestPB req;
+  req.set_dest_uuid(leader->uuid());
+  req.set_tablet_id(tablet_id);
+  if (cas_config_index) {
+    req.set_cas_config_opid_index(*cas_config_index);
+  }
+  for (const auto& change : changes) {
+    *req.add_config_changes() = change;
+  }
+
+  ChangeConfigResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(timeout);
+  RETURN_NOT_OK(leader->consensus_proxy->BulkChangeConfig(req, &resp, &rpc));
+  if (resp.has_error()) {
+    if (error_code) {
+      *error_code = resp.error().code();
+    }
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
 Status ListTablets(const TServerDetails* ts,
                    const MonoDelta& timeout,
                    vector<ListTabletsResponsePB::StatusAndSchemaPB>* tablets) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index e9e13cc..64260f1 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -150,6 +150,14 @@ Status GetConsensusState(const TServerDetails* replica,
                          const MonoDelta& timeout,
                          consensus::ConsensusStatePB* consensus_state);
 
+// Wait until there is no longer a pending config on the specified server.
+// If OK is returned, the consensus state is also returned in 'cstate' if it is
+// not set to nullptr.
+Status WaitUntilNoPendingConfig(const TServerDetails* replica,
+                                const std::string& tablet_id,
+                                const MonoDelta& timeout,
+                                consensus::ConsensusStatePB* cstate = nullptr);
+
 // Wait until the number of voters in the committed consensus configuration is
 // 'quorum_size', according to the specified replica.
 Status WaitUntilCommittedConfigNumVotersIs(int config_size,
@@ -296,6 +304,15 @@ Status ChangeReplicaType(const TServerDetails* leader,
                          const boost::optional<int64_t>& cas_config_index = boost::none,
                          tserver::TabletServerErrorPB::Code* error_code = nullptr);
 
+// Convenience function for bulk change config API.
+Status BulkChangeConfig(const TServerDetails* leader,
+                        const std::string& tablet_id,
+                        const std::vector<consensus::BulkChangeConfigRequestPB
+                                                            ::ConfigChangeItemPB>& changes,
+                        const MonoDelta& timeout,
+                        const boost::optional<int64_t>& cas_config_index = boost::none,
+                        tserver::TabletServerErrorPB::Code* error_code = nullptr);
+
 // Get the list of tablets from the remote server.
 Status ListTablets(const TServerDetails* ts,
                    const MonoDelta& timeout,

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/integration-tests/raft_config_change-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_config_change-itest.cc b/src/kudu/integration-tests/raft_config_change-itest.cc
index b0f0cba..8df15a5 100644
--- a/src/kudu/integration-tests/raft_config_change-itest.cc
+++ b/src/kudu/integration-tests/raft_config_change-itest.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstdint>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -23,11 +24,15 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
@@ -37,14 +42,22 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using kudu::consensus::ADD_PEER;
 using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::GetRaftConfigMember;
+using kudu::consensus::MODIFY_PEER;
 using kudu::consensus::RaftPeerAttrsPB;
 using kudu::consensus::RaftPeerPB;
+using kudu::consensus::REMOVE_PEER;
+using kudu::itest::BulkChangeConfig;
 using kudu::itest::TServerDetails;
+using kudu::pb_util::SecureShortDebugString;
 using std::string;
 using std::unordered_set;
 using std::vector;
@@ -214,4 +227,220 @@ TEST_F(RaftConfigChangeITest, TestNonVoterPromotion) {
   NO_FATALS(cluster_->AssertNoCrashes());
 }
 
+// Functional test for the BulkChangeConfig RPC API.
+TEST_F(RaftConfigChangeITest, TestBulkChangeConfig) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+  const int kNumTabletServers = 4;
+  const int kNumInitialReplicas = 3;
+  NO_FATALS(StartCluster({"--enable_leader_failure_detection=false",
+                          "--raft_prepare_replacement_before_eviction=true"},
+                         {"--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+                          "--raft_prepare_replacement_before_eviction=true"},
+                         kNumTabletServers));
+
+  // Create a table.
+  TestWorkload workload(cluster_.get());
+  workload.Setup();
+
+  ASSERT_OK(inspect_->WaitForReplicaCount(kNumInitialReplicas));
+  master::GetTableLocationsResponsePB table_locations;
+  ASSERT_OK(itest::GetTableLocations(cluster_->master_proxy(), TestWorkload::kDefaultTableName,
+                                     kTimeout, &table_locations));
+  ASSERT_EQ(1, table_locations.tablet_locations_size()); // Only 1 tablet.
+  ASSERT_EQ(kNumInitialReplicas, table_locations.tablet_locations().begin()->replicas_size());
+  string tablet_id = table_locations.tablet_locations().begin()->tablet_id();
+  unordered_set<int> replica_indexes;
+  for (int i = 0; i < table_locations.tablet_locations().begin()->replicas_size(); i++) {
+    const auto& replica = table_locations.tablet_locations().begin()->replicas(i);
+    int idx = cluster_->tablet_server_index_by_uuid(replica.ts_info().permanent_uuid());
+    replica_indexes.emplace(idx);
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(idx)->uuid()],
+                                            tablet_id, kTimeout));
+  }
+  ASSERT_EQ(kNumInitialReplicas, replica_indexes.size());
+  const int kLeaderIndex = *replica_indexes.begin();
+  int new_replica_index = -1;
+  for (int i = 0; i < kNumTabletServers; i++) {
+    if (!ContainsKey(replica_indexes, i)) {
+      new_replica_index = i;
+    }
+  }
+  ASSERT_NE(-1, new_replica_index);
+
+  string leader_uuid = cluster_->tablet_server(kLeaderIndex)->uuid();
+  auto* leader_replica = ts_map_[leader_uuid];
+  ASSERT_OK(itest::StartElection(leader_replica, tablet_id, kTimeout));
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  // We don't want the master interfering with the rest of the test.
+  cluster_->master()->Shutdown();
+
+  struct BulkSpec {
+    consensus::ChangeConfigType change_type;
+    int tserver_index;
+    RaftPeerPB::MemberType member_type;
+    bool replace;
+    bool promote;
+  };
+
+  // Now comes the actual config change testing.
+  auto bulk_change = [&](const vector<BulkSpec>& changes,
+                         boost::optional<int64_t> cas_config_index = boost::none) {
+    vector<consensus::BulkChangeConfigRequestPB::ConfigChangeItemPB> changes_pb;
+    for (const auto& chg : changes) {
+      const auto& ts_uuid = cluster_->tablet_server(chg.tserver_index)->uuid();
+      auto* replica = ts_map_[ts_uuid];
+
+      consensus::BulkChangeConfigRequestPB::ConfigChangeItemPB change_pb;
+      change_pb.set_type(chg.change_type);
+
+      RaftPeerPB* peer = change_pb.mutable_peer();
+      peer->set_permanent_uuid(ts_uuid);
+      peer->set_member_type(chg.member_type);
+      peer->mutable_attrs()->set_replace(chg.replace);
+      peer->mutable_attrs()->set_promote(chg.promote);
+      *peer->mutable_last_known_addr() = replica->registration.rpc_addresses(0);
+      changes_pb.emplace_back(std::move(change_pb));
+    }
+
+    LOG(INFO) << "submitting config change with changes:";
+    for (const auto& change_pb : changes_pb) {
+      LOG(INFO) << SecureShortDebugString(change_pb);
+    }
+    return BulkChangeConfig(leader_replica, tablet_id, changes_pb,
+                            kTimeout, cas_config_index);
+  };
+
+  // 1) Add a voter. Change config to: V, V, V, V.
+  ASSERT_OK(bulk_change({ { ADD_PEER, new_replica_index, RaftPeerPB::VOTER,
+                            /*replace=*/ false, /*promote=*/ false } }));
+  ConsensusStatePB cstate;
+  ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
+  ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
+  ASSERT_EQ(kNumTabletServers, CountVoters(cstate.committed_config()));
+
+  // 2) Simultaneous voter modification and attribute modification.
+  //    Change config to: V, V, N, V+p.
+  //    Note: setting a VOTER's attribute promote=true is meaningless.
+  int replica1_idx = (kLeaderIndex + 1) % kNumTabletServers;
+  int replica2_idx = (kLeaderIndex + 2) % kNumTabletServers;
+  ASSERT_OK(bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
+                            /*replace=*/ false, /*promote=*/ false },
+                          { MODIFY_PEER, replica2_idx, RaftPeerPB::VOTER,
+                            /*replace=*/ false,  /*promote=*/ true } }));
+
+  ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
+  ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
+  ASSERT_EQ(kNumInitialReplicas, CountVoters(cstate.committed_config()));
+
+  RaftPeerPB* peer;
+  ASSERT_OK(GetRaftConfigMember(cstate.mutable_committed_config(),
+                                cluster_->tablet_server(replica2_idx)->uuid(), &peer));
+  ASSERT_EQ(RaftPeerPB::VOTER, peer->member_type());
+  ASSERT_TRUE(peer->attrs().promote()) << SecureShortDebugString(*peer);
+
+  // 3) Single-attribute modification. Change config to: V, V, N+r, V+p.
+  //    Note: at the time of writing, if the master is disabled this
+  //    configuration will not trigger any actions such as promotion or
+  //    eviction.
+  ASSERT_OK(bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
+                            /*replace=*/ true, /*promote=*/ false } }));
+
+  ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
+  ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size())
+      << SecureShortDebugString(cstate);
+  ASSERT_EQ(kNumInitialReplicas, CountVoters(cstate.committed_config()))
+      << SecureShortDebugString(cstate);
+
+  ASSERT_OK(GetRaftConfigMember(cstate.mutable_committed_config(),
+                                cluster_->tablet_server(replica1_idx)->uuid(), &peer));
+  ASSERT_EQ(RaftPeerPB::NON_VOTER, peer->member_type());
+  ASSERT_TRUE(peer->attrs().replace()) << SecureShortDebugString(*peer);
+
+  // 4) Deny changing config (illegally) from: { V, V, N, V } to: { V, V, V, N }
+  //    because that would be both a promotion and a demotion in one step.
+  Status s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
+                             /*replace=*/ false, /*promote=*/ false },
+                           { MODIFY_PEER, replica2_idx, RaftPeerPB::NON_VOTER,
+                             /*replace=*/ false, /*promote=*/ false } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "not safe to modify the VOTER status of "
+                                    "more than one peer at a time");
+
+  // 5) The caller must not be allowed to make the leader a NON_VOTER.
+  s = bulk_change({ { MODIFY_PEER, kLeaderIndex, RaftPeerPB::NON_VOTER,
+                      /*replace=*/ false, /*promote=*/ false } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "Cannot modify member type of peer .* because it is the leader");
+
+  // 6) The 'cas_config_index' flag must be respected, if set.
+  int64_t committed_config_opid_index = cstate.committed_config().opid_index();
+  s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::NON_VOTER,
+                      /*replace=*/ false, /*promote=*/ true } }, committed_config_opid_index + 1);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "specified cas_config_opid_index of .* but "
+                                   "the committed config has opid_index of .*");
+
+  // 7) Evict down to 2 voters. We will evict a voter and a non-voter at once.
+  ASSERT_OK(bulk_change({ { REMOVE_PEER, replica1_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
+                            /*replace=*/ false, /*promote=*/ false },
+                          { REMOVE_PEER, replica2_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
+                            /*replace=*/ false, /*promote=*/ false } }));
+  ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
+  ASSERT_EQ(2, cstate.committed_config().peers_size());
+  ASSERT_EQ(2, CountVoters(cstate.committed_config()));
+
+  // 8) We should reject adding multiple voters at once.
+  s = bulk_change({ { ADD_PEER, replica1_idx, RaftPeerPB::VOTER,
+                      /*replace=*/ false, /*promote=*/ false },
+                    { ADD_PEER, replica2_idx, RaftPeerPB::VOTER,
+                      /*replace=*/ false, /*promote=*/ false } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "not safe to modify the VOTER status of "
+                                   "more than one peer at a time");
+
+  // 9) Add them back one at a time so we get to full strength (4 voters) again.
+  auto to_restore = { replica1_idx, replica2_idx };
+  for (auto r : to_restore) {
+    ASSERT_OK(bulk_change({ { ADD_PEER, r, RaftPeerPB::VOTER,
+                              /*replace=*/ false, /*promote=*/ false } }));
+    ASSERT_OK(WaitUntilNoPendingConfig(leader_replica, tablet_id, kTimeout, &cstate));
+  }
+  ASSERT_EQ(kNumTabletServers, cstate.committed_config().peers_size());
+  ASSERT_EQ(kNumTabletServers, CountVoters(cstate.committed_config()));
+
+  // 10) We should reject removing multiple voters at once.
+  s = bulk_change({ { REMOVE_PEER, replica1_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
+                      /*replace=*/ false, /*promote=*/ false },
+                    { REMOVE_PEER, replica2_idx, RaftPeerPB::UNKNOWN_MEMBER_TYPE,
+                      /*replace=*/ false, /*promote=*/ false } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "not safe to modify the VOTER status of "
+                                   "more than one peer at a time");
+
+  // 11) Reject no-ops.
+  s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
+                      /*replace=*/ false, /*promote=*/ false } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "must modify a field when calling MODIFY_PEER");
+
+  // 12) Reject empty bulk change config operations.
+  s = bulk_change({ });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "requested configuration change does not "
+                                   "actually modify the config");
+
+  // 13) Reject multiple changes to the same peer in a single request.
+  s = bulk_change({ { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
+                      /*replace=*/ true, /*promote=*/ false },
+                    { MODIFY_PEER, replica1_idx, RaftPeerPB::VOTER,
+                      /*replace=*/ false, /*promote=*/ true } });
+  ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_MATCHES(s.ToString(), "only one change allowed per peer");
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 4853e5f..8c37676 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -670,7 +670,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
                                        tablet_id, replica, kTimeout);
     const auto s_str = s.ToString();
     ASSERT_TRUE(s.IsInvalidArgument()) << s_str;
-    ASSERT_STR_CONTAINS(s_str, "Cannot change replica type to same type");
+    ASSERT_STR_CONTAINS(s_str, "must modify a field when calling MODIFY_PEER");
   }
 
   // It should be possible to promote a non-voter to voter.
@@ -683,7 +683,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
                                        tablet_id, replica, kTimeout);
     const auto s_str = s.ToString();
     ASSERT_TRUE(s.IsInvalidArgument()) << s_str;
-    ASSERT_STR_CONTAINS(s_str, "Cannot change replica type to same type");
+    ASSERT_STR_CONTAINS(s_str, "must modify a field when calling MODIFY_PEER");
   }
 
   {
@@ -695,7 +695,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
     const auto s_demote_str = s_demote.ToString();
     ASSERT_TRUE(s_demote.IsInvalidArgument()) << s_demote_str;
     ASSERT_STR_MATCHES(s_demote_str,
-        "Cannot modify peer .* because it is the leader");
+        "Cannot modify member type of peer .* because it is the leader");
 
     // It should be impossible to promote a leader replica since it's
     // already a voter.
@@ -703,8 +703,7 @@ TEST_F(RaftConsensusNonVoterITest, PromoteAndDemote) {
                                                      tablet_id, leader, kTimeout);
     const auto s_promote_str = s_promote.ToString();
     ASSERT_TRUE(s_promote.IsInvalidArgument()) << s_promote_str;
-    ASSERT_STR_MATCHES(s_promote_str,
-        "Cannot modify peer .* because it is the leader");
+    ASSERT_STR_CONTAINS(s_promote_str, "must modify a field when calling MODIFY_PEER");
   }
 
   // Demote the replica back.

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 37c0f5c..22753d8 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -136,6 +136,7 @@ DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
 
 using google::protobuf::RepeatedPtrField;
+using kudu::consensus::BulkChangeConfigRequestPB;
 using kudu::consensus::ChangeConfigRequestPB;
 using kudu::consensus::ChangeConfigResponsePB;
 using kudu::consensus::ConsensusRequestPB;
@@ -1039,6 +1040,30 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
   // The success case is handled when the callback fires.
 }
 
+void ConsensusServiceImpl::BulkChangeConfig(const BulkChangeConfigRequestPB* req,
+                                            ChangeConfigResponsePB* resp,
+                                            RpcContext* context) {
+  VLOG(1) << "Received BulkChangeConfig RPC: " << SecureDebugString(*req);
+  if (!CheckUuidMatchOrRespond(tablet_manager_, "BulkChangeConfig", req, resp, context)) {
+    return;
+  }
+  scoped_refptr<TabletReplica> replica;
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
+    return;
+  }
+
+  shared_ptr<RaftConsensus> consensus;
+  if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
+  boost::optional<TabletServerErrorPB::Code> error_code;
+  Status s = consensus->BulkChangeConfig(*req, BindHandleResponse(req, resp, context), &error_code);
+  if (PREDICT_FALSE(!s.ok())) {
+    HandleErrorResponse(req, resp, context, error_code, s);
+    return;
+  }
+  // The success case is handled when the callback fires.
+}
+
 void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB* req,
                                               UnsafeChangeConfigResponsePB* resp,
                                               RpcContext* context) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/eb0718e1/src/kudu/tserver/tablet_service.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index 5f9cc02..cb24cd3 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -45,6 +45,7 @@ class ServerBase;
 } // namespace server
 
 namespace consensus {
+class BulkChangeConfigRequestPB;
 class ChangeConfigRequestPB;
 class ChangeConfigResponsePB;
 class ConsensusRequestPB;
@@ -200,6 +201,10 @@ class ConsensusServiceImpl : public consensus::ConsensusServiceIf {
                             consensus::ChangeConfigResponsePB* resp,
                             rpc::RpcContext* context) OVERRIDE;
 
+  virtual void BulkChangeConfig(const consensus::BulkChangeConfigRequestPB* req,
+                                consensus::ChangeConfigResponsePB* resp,
+                                rpc::RpcContext* context) OVERRIDE;
+
   virtual void UnsafeChangeConfig(const consensus::UnsafeChangeConfigRequestPB* req,
                                   consensus::UnsafeChangeConfigResponsePB* resp,
                                   rpc::RpcContext* context) OVERRIDE;