You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/22 19:03:17 UTC
[kudu] branch master updated: [consensus] KUDU-2727 lock-free
CheckLeadershipAndBindTerm()
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new da1c66b [consensus] KUDU-2727 lock-free CheckLeadershipAndBindTerm()
da1c66b is described below
commit da1c66b61c4924a4726aaeb171e79cc0b03ca65c
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Jun 5 17:01:53 2020 -0700
[consensus] KUDU-2727 lock-free CheckLeadershipAndBindTerm()
This addresses the following lock contention issue outlined in KUDU-2727:
tids=[93293,93284,93285,93286,93287,93288,93289,93290,93291,93292,93304,93294,93295,93296,93297,93298,93299,93300,93301,93302,93303,93313,93322,93321,93320,93319,93318,93317,93316,93315,93314,93283,93312,93311,93310,93309,93308,93307,93306,93305]
0x7f61b79fc5e0 <unknown>
0x1ec35f4 base::internal::SpinLockDelay()
0x1ec347c base::SpinLock::SlowLock()
0xb7deb8 kudu::consensus::RaftConsensus::CheckLeadershipAndBindTerm()
0xaab010 kudu::tablet::TransactionDriver::ExecuteAsync()
0xaa344c kudu::tablet::TabletReplica::SubmitWrite()
0x928fb0 kudu::tserver::TabletServiceImpl::Write()
0x1d2e8d9 kudu::rpc::GeneratedServiceIf::Handle()
0x1d2efd9 kudu::rpc::ServicePool::RunThread()
0x1ea4a84 kudu::Thread::SuperviseThread()
0x7f61b79f4e25 start_thread
0x7f61b5cd234d __clone
tids=[93383]
0x7f61b79fc5e0 <unknown>
0x7f61b79f8cf2 __pthread_cond_timedwait
0x1dfcfa9 kudu::ConditionVariable::WaitUntil()
0xb73bc7 kudu::consensus::RaftConsensus::UpdateReplica()
0xb75128 kudu::consensus::RaftConsensus::Update()
0x92c5d1 kudu::tserver::ConsensusServiceImpl::UpdateConsensus()
0x1d2e8d9 kudu::rpc::GeneratedServiceIf::Handle()
0x1d2efd9 kudu::rpc::ServicePool::RunThread()
0x1ea4a84 kudu::Thread::SuperviseThread()
0x7f61b79f4e25 start_thread
0x7f61b5cd234d __clone
In addition, with this patch RaftConsensus::DumpStatusHtml() no longer
blocks Raft consensus activity and isn't blocked by it either.
The inspiration for this update came from Yugabyte-DB:
https://github.com/yugabyte/yugabyte-db/commit/7b3271e4a0f34ff3c3f6e8961288db9cee335461
Change-Id: I934ae3035d893fd850afe27d96f8dd6612c9ffbd
Reviewed-on: http://gerrit.cloudera.org:8080/16034
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/consensus_meta.cc | 91 +++++++++++++++--
src/kudu/consensus/consensus_meta.h | 23 ++++-
src/kudu/consensus/raft_consensus.cc | 112 +++++++++++++++------
src/kudu/consensus/raft_consensus.h | 11 +-
src/kudu/integration-tests/raft_consensus-itest.cc | 3 +-
src/kudu/tserver/tablet_service.cc | 7 +-
6 files changed, 202 insertions(+), 45 deletions(-)
diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc
index 98b4d34..18a8f47 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -16,7 +16,8 @@
// under the License.
#include "kudu/consensus/consensus_meta.h"
-#include <mutex>
+#include <cstddef>
+#include <limits>
#include <ostream>
#include <utility>
@@ -49,12 +50,75 @@ DEFINE_bool(cmeta_force_fsync, false,
"Whether fsync() should be called when consensus metadata files are updated");
TAG_FLAG(cmeta_force_fsync, advanced);
+using std::string;
+using strings::Substitute;
+
namespace kudu {
namespace consensus {
-using std::lock_guard;
-using std::string;
-using strings::Substitute;
+namespace {
+
+constexpr size_t kPackedRoleBits = 3;
+constexpr size_t kPackedTermBits = 8 * sizeof(uint64_t) - kPackedRoleBits;
+constexpr uint64_t kUnknownRolePacked = (1ULL << kPackedRoleBits) - 1;
+constexpr uint64_t kRoleMask = kUnknownRolePacked << kPackedTermBits;
+constexpr uint64_t kTermMask = ~kRoleMask;
+
+static_assert((kRoleMask | kTermMask) == std::numeric_limits<uint64_t>::max(),
+ "term and role should fit into uint64_t");
+static_assert((kTermMask & kRoleMask) == 0,
+ "term and role masks must not intersect");
+//
+// Packing role and term into uint64_t:
+//
+// * * * * * ... * * * *
+// ^ ^ ^
+// 63 60 0
+//
+// Bits 0..60 inclusive contain term. Bits 61..63 contain role.
+
+uint64_t PackRoleAndTerm(RaftPeerPB::Role role, int64_t term) {
+ // Ensure the term is not wider than kPackedTermBits: maximum possible is
+ // 2305843009213693951. Here it might be something like
+ //
+ // CHECK_EQ(0, kRoleMask & term) << "term is too big: " << term;
+ //
+ // However, sometimes the data read from disk is corrupted, and we don't want
+ // to crash just because of that. The corruption is detected and handled
+ // gracefully at a higher level (e.g., the server marks corresponding replica
+ // as failed). With current approach, the maximum acceptable term is
+ // 2305843009213693950; 2305843009213693951 (kTermMask) is a special value.
+ if (PREDICT_FALSE((term & kRoleMask) != 0)) {
+ // A special value to signal that a 'non-packable' term was supplied.
+ term = kTermMask;
+ }
+
+ // The allocated bit space for role is just 3 bits, but it's necessary
+ // to handle the constant 999 defined in the proto file for UNKNOWN_ROLE.
+ // Changing the constant behind UNKNOWN_ROLE is not an option
+ // due to compatibility issues.
+ uint64_t r = (role == RaftPeerPB::UNKNOWN_ROLE) ? kUnknownRolePacked
+ : static_cast<uint64_t>(role);
+ return (r << kPackedTermBits) | term;
+}
+
+RaftPeerPB::Role UnpackRole(uint64_t role_and_term_packed) {
+ const auto role = role_and_term_packed >> kPackedTermBits;
+ if (PREDICT_FALSE(role == kUnknownRolePacked)) {
+ return RaftPeerPB::UNKNOWN_ROLE;
+ }
+ return static_cast<RaftPeerPB::Role>(role);
+}
+
+int64_t UnpackTerm(uint64_t role_and_term_packed) {
+ const auto t = role_and_term_packed & kTermMask;
+ if (PREDICT_FALSE(t == kTermMask)) {
+ LOG(FATAL) << "packed term is invalid: " << t;
+ }
+ return static_cast<int64_t>(t);
+}
+
+} // anonymous namespace
int64_t ConsensusMetadata::current_term() const {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
@@ -66,6 +130,7 @@ void ConsensusMetadata::set_current_term(int64_t term) {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
DCHECK_GE(term, kMinimumTerm);
pb_.set_current_term(term);
+ UpdateRoleAndTermCache();
}
bool ConsensusMetadata::has_voted_for() const {
@@ -253,8 +318,13 @@ Status ConsensusMetadata::Flush(FlushMode flush_mode) {
FLAGS_log_force_fsync_all || FLAGS_cmeta_force_fsync ? pb_util::SYNC : pb_util::NO_SYNC),
Substitute("Unable to write consensus meta file for tablet $0 to path $1",
tablet_id_, meta_file_path));
- RETURN_NOT_OK(UpdateOnDiskSize());
- return Status::OK();
+ return UpdateOnDiskSize();
+}
+
+ConsensusMetadata::RoleAndTerm ConsensusMetadata::GetRoleAndTerm() const {
+ // Read the cached role and term atomically to unpack them consistently.
+ const uint64_t val = role_and_term_cache_;
+ return std::make_pair(UnpackRole(val), UnpackTerm(val));
}
ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
@@ -265,7 +335,9 @@ ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
peer_uuid_(std::move(peer_uuid)),
has_pending_config_(false),
flush_count_for_tests_(0),
+ active_role_(RaftPeerPB::UNKNOWN_ROLE),
on_disk_size_(0) {
+ UpdateRoleAndTermCache();
}
Status ConsensusMetadata::Create(FsManager* fs_manager,
@@ -324,11 +396,18 @@ std::string ConsensusMetadata::LogPrefix() const {
void ConsensusMetadata::UpdateActiveRole() {
DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
active_role_ = GetConsensusRole(peer_uuid_, leader_uuid_, ActiveConfig());
+ UpdateRoleAndTermCache();
VLOG_WITH_PREFIX(1) << "Updating active role to " << RaftPeerPB::Role_Name(active_role_)
<< ". Consensus state: "
<< pb_util::SecureShortDebugString(ToConsensusStatePB());
}
+void ConsensusMetadata::UpdateRoleAndTermCache() {
+ DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
+ role_and_term_cache_ = PackRoleAndTerm(
+ active_role_, pb_.has_current_term() ? current_term() : -1);
+}
+
Status ConsensusMetadata::UpdateOnDiskSize() {
string path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
uint64_t on_disk_size;
diff --git a/src/kudu/consensus/consensus_meta.h b/src/kudu/consensus/consensus_meta.h
index df24fd2..44dd601 100644
--- a/src/kudu/consensus/consensus_meta.h
+++ b/src/kudu/consensus/consensus_meta.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <string>
+#include <utility>
#include <gtest/gtest_prod.h>
@@ -75,6 +76,8 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
NO_OVERWRITE
};
+ typedef std::pair<RaftPeerPB::Role, int64_t> RoleAndTerm;
+
// Accessors for current term.
int64_t current_term() const;
void set_current_term(int64_t term);
@@ -161,6 +164,9 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
return on_disk_size_.load(std::memory_order_relaxed);
}
+ // Return cached peer role and term, lock-free.
+ RoleAndTerm GetRoleAndTerm() const;
+
private:
friend class RaftConsensusQuorumTest;
friend class RefCountedThreadSafe<ConsensusMetadata>;
@@ -212,6 +218,9 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
// Updates the cached active role.
void UpdateActiveRole();
+ // Update the cached active role and term.
+ void UpdateRoleAndTermCache();
+
// Updates the cached on-disk size of the consensus metadata.
Status UpdateOnDiskSize();
@@ -229,15 +238,23 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
// RaftConfig used by the peers when there is a pending config change operation.
RaftConfigPB pending_config_;
- // Cached role of the peer_uuid_ within the active configuration.
- RaftPeerPB::Role active_role_;
-
// The number of times the metadata has been flushed to disk.
int64_t flush_count_for_tests_;
// Durable fields.
ConsensusMetadataPB pb_;
+ // Role of the peer_uuid_ within the active configuration.
+ RaftPeerPB::Role active_role_;
+
+ // Cached term and role of the peer_uuid_ within the active configuration,
+ // packed into 64-bit integer. These are used to get a consistent snapshot of
+ // the replica role and term in a lock-free manner. The cached information
+ // might become stale right after it retrieved by GetRoleAndTerm() method
+ // (i.e. active_role_ and term in pending_config_ updated right after that),
+ // but it's totally fine for the intended use.
+ std::atomic<uint64_t> role_and_term_cache_;
+
// The on-disk size of the consensus metadata, as of the last call to
// Load() or Flush().
// The type is int64_t for consistency with other on-disk size metrics,
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cefa1bf..d37e6c9 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -187,6 +187,7 @@ RaftConsensus::RaftConsensus(
server_ctx_(std::move(server_ctx)),
state_(kNew),
rng_(GetRandomSeed32()),
+ leader_is_ready_(false),
leader_transfer_in_progress_(false),
withhold_votes_until_(MonoTime::Min()),
last_received_cur_leader_(MinimumOpId()),
@@ -197,6 +198,7 @@ RaftConsensus::RaftConsensus(
}
Status RaftConsensus::Init() {
+ LockGuard l(lock_);
DCHECK_EQ(kNew, state_) << State_Name(state_);
RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
SetStateUnlocked(kInitialized);
@@ -391,8 +393,6 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
}
bool RaftConsensus::IsRunning() const {
- ThreadRestrictions::AssertWaitAllowed();
- LockGuard l(lock_);
return state_ == kRunning;
}
@@ -693,7 +693,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
queue_->RegisterObserver(this);
bool was_leader = queue_->IsInLeaderMode();
RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
- if (!was_leader && server_ctx_.num_leaders) server_ctx_.num_leaders->Increment();
+ if (!was_leader && server_ctx_.num_leaders) {
+ server_ctx_.num_leaders->Increment();
+ }
// Initiate a NO_OP op that is sent at the beginning of every term
// change in raft.
@@ -711,8 +713,10 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
});
last_leader_communication_time_micros_ = 0;
+ RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
+ leader_is_ready_ = true;
- return AppendNewRoundToQueueUnlocked(round);
+ return Status::OK();
}
Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta) {
@@ -757,10 +761,57 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
}
Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
- ThreadRestrictions::AssertWaitAllowed();
- LockGuard l(lock_);
- RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
- round->BindToTerm(CurrentTermUnlocked());
+#ifndef NDEBUG
+ const auto& msg = *round->replicate_msg();
+ DCHECK(!msg.has_id()) << "should not have an ID yet: "
+ << SecureShortDebugString(msg);
+#endif
+ // Get a snapshot of an atomic member for consistency between the condition
+ // and the error message, if any.
+ const State state = state_;
+ if (PREDICT_FALSE(state != kRunning)) {
+ return Status::IllegalState("RaftConsensus is not running",
+ Substitute("state $0", State_Name(state)));
+ }
+
+ // The order of reading role_and_term and leader_is_ready is essential to
+ // deal with situations when leader_is_ready and role_and_term are read
+ // in different Raft terms.
+ // * It's safe to bind to a stale term even if we've asserted leadership
+ // in a newer term: the stale op will be rejected on followers anyway
+ // because Raft guarantees that a majority of replicas have accepted
+ // the new term.
+ // * It's safe for the term to be stale if we haven't asserted leadership
+ // for a newer term because we'll exit out below.
+ // * It's unsafe to bind to a newer term if we've asserted leadership
+ // for a stale term, hence this ordering.
+ const auto role_and_term = cmeta_->GetRoleAndTerm();
+ const bool leader_is_ready = leader_is_ready_;
+
+ const auto role = role_and_term.first;
+ switch (role) {
+ case RaftPeerPB::LEADER:
+ if (leader_transfer_in_progress_) {
+ return Status::ServiceUnavailable("leader transfer in progress");
+ }
+ if (!leader_is_ready) {
+ // Leader replica should not accept write operations before scheduling
+ // the replication of a NO_OP to assert its leadership in the current
+ // term. Otherwise there might be a race, so the very first accepted
+ // write operation might have timestamp lower than the timestamp
+ // of the NO_OP. That would trigger an assertion in MVCC.
+ return Status::ServiceUnavailable("leader is not yet ready");
+ }
+ break;
+
+ default:
+ return Status::IllegalState(
+ Substitute("replica $0 is not leader of this config: current role $1",
+ peer_uuid(), RaftPeerPB::Role_Name(role)));
+ }
+ const auto term = role_and_term.second;
+ round->BindToTerm(term);
+
return Status::OK();
}
@@ -836,10 +887,11 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
// We will process commit notifications while shutting down because a replica
// which has initiated a Prepare() / Replicate() may eventually commit even if
// its state has changed after the initial Append() / Update().
- if (PREDICT_FALSE(state_ != kRunning && state_ != kStopping)) {
+ const State state = state_;
+ if (PREDICT_FALSE(state != kRunning && state != kStopping)) {
LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
<< "Replica not in running state: "
- << State_Name(state_);
+ << State_Name(state);
return;
}
@@ -1233,8 +1285,8 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
LeaderRequest* deduped_req) {
DCHECK(lock_.is_locked());
- if (request->has_deprecated_committed_index() ||
- !request->has_all_replicated_index()) {
+ if (PREDICT_FALSE(request->has_deprecated_committed_index() ||
+ !request->has_all_replicated_index())) {
return Status::InvalidArgument("Leader appears to be running an earlier version "
"of Kudu. Please shut down and upgrade all servers "
"before restarting.");
@@ -1720,7 +1772,8 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
// If RaftConsensus is running, we use the latest OpId from the WAL to vote.
// Otherwise, we must be voting while tombstoned.
OpId local_last_logged_opid;
- switch (state_) {
+ const State state = state_;
+ switch (state) {
case kShutdown:
return Status::IllegalState("cannot vote while shut down");
case kRunning:
@@ -2174,11 +2227,13 @@ void RaftConsensus::Stop() {
TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
"peer", peer_uuid(),
"tablet", options_.tablet_id);
-
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- if (state_ == kStopping || state_ == kStopped || state_ == kShutdown) return;
+ const State state = state_;
+ if (state == kStopping || state == kStopped || state == kShutdown) {
+ return;
+ }
// Transition to kStopping state.
SetStateUnlocked(kStopping);
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
@@ -2199,7 +2254,9 @@ void RaftConsensus::Stop() {
{
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- if (pending_) CHECK_OK(pending_->CancelPendingOps());
+ if (pending_) {
+ CHECK_OK(pending_->CancelPendingOps());
+ }
SetStateUnlocked(kStopped);
// Clear leader status on Stop(), in case this replica was the leader. If
@@ -2423,6 +2480,7 @@ int64_t RaftConsensus::CurrentTerm() const {
}
void RaftConsensus::SetStateUnlocked(State new_state) {
+ DCHECK(lock_.is_locked());
switch (new_state) {
case kInitialized:
CHECK_EQ(kNew, state_);
@@ -2518,8 +2576,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
queue_->SetLeaderMode(pending_->GetCommittedIndex(),
CurrentTermUnlocked(),
active_config);
- RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
- return Status::OK();
+ return peer_manager_->UpdateRaftConfig(active_config);
}
const string& RaftConsensus::peer_uuid() const {
@@ -2569,16 +2626,12 @@ RaftConfigPB RaftConsensus::CommittedConfig() const {
}
void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
- RaftPeerPB::Role role;
- {
- ThreadRestrictions::AssertWaitAllowed();
- LockGuard l(lock_);
- if (state_ != kRunning) {
- out << "Tablet " << EscapeForHtmlToString(tablet_id()) << " not running" << std::endl;
- return;
- }
- role = cmeta_->active_role();
+ if (state_ != kRunning) {
+ out << "Tablet " << EscapeForHtmlToString(tablet_id())
+ << " not running" << std::endl;
+ return;
}
+ const RaftPeerPB::Role role = cmeta_->GetRoleAndTerm().first;
out << "<h1>Raft Consensus State</h1>" << std::endl;
@@ -2974,7 +3027,9 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
- if (term_metric_) term_metric_->set_value(new_term);
+ if (term_metric_) {
+ term_metric_->set_value(new_term);
+ }
last_received_cur_leader_ = MinimumOpId();
return Status::OK();
}
@@ -3121,6 +3176,7 @@ bool RaftConsensus::HasLeaderUnlocked() const {
void RaftConsensus::ClearLeaderUnlocked() {
DCHECK(lock_.is_locked());
+ leader_is_ready_ = false;
cmeta_->set_leader_uuid("");
}
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 33ec160..bc22c18 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -856,7 +856,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Coarse-grained lock that protects all mutable data members.
mutable simple_spinlock lock_;
- State state_;
+ std::atomic<State> state_;
// Consensus metadata persistence object.
scoped_refptr<ConsensusMetadata> cmeta_;
@@ -887,6 +887,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
std::shared_ptr<rpc::PeriodicTimer> failure_detector_;
+ // Whether a replica, switched into the leader mode, has successfully
+ // scheduled a NO_OP Raft message to replicate, asserting its leadership in
+ // the term where it has just become a leader.
+ std::atomic<bool> leader_is_ready_;
+
+ // A few fields used for the leadership transfer process.
std::atomic<bool> leader_transfer_in_progress_;
boost::optional<std::string> designated_successor_uuid_;
std::shared_ptr<rpc::PeriodicTimer> transfer_period_timer_;
@@ -1036,8 +1042,7 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
void NotifyReplicationFinished(const Status& status);
// Binds this round such that it may not be eventually executed in any term
- // other than 'term'.
- // See CheckBoundTerm().
+ // other than 'term'. See CheckBoundTerm().
void BindToTerm(int64_t term) {
DCHECK_EQ(bound_term_, -1);
bound_term_ = term;
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 22151c7..1f6a74c 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -783,7 +783,8 @@ TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
ASSERT_TRUE(resp.has_error());
Status s = StatusFromPB(resp.error().status());
EXPECT_TRUE(s.IsIllegalState());
- ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "is not leader of this config: current role FOLLOWER");
// TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER
// so that the client can properly handle this case! plumbing this is a little difficult
// so not addressing at the moment.
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 4a5c040..6a87a65 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -654,8 +654,8 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
public:
RpcOpCompletionCallback(rpc::RpcContext* context,
Response* response)
- : context_(context),
- response_(response) {}
+ : context_(context),
+ response_(response) {}
virtual void OpCompleted() OVERRIDE {
if (!status_.ok()) {
@@ -665,10 +665,9 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
} else {
context_->RespondSuccess();
}
- };
+ }
private:
-
TabletServerErrorPB* get_error() {
return response_->mutable_error();
}