You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/06/01 20:50:59 UTC
[1/3] kudu git commit: [tools] Use PrintTable to format ksck's
consensus matrix
Repository: kudu
Updated Branches:
refs/heads/master ec81bd9d2 -> fbaa7dce9
[tools] Use PrintTable to format ksck's consensus matrix
The first version of ksck's consensus matrix used its own table
code, but actually there was already nice table code available
in tool_action_common.h. This switches ksck to use it. It also
generalizes the table code to output to a generic ostream
instead of only cout; this was necessary for ksck-test but might
be useful in other ways later.
Change-Id: I8d77005f20091e6778702580e3a269d9689c5b0a
Reviewed-on: http://gerrit.cloudera.org:8080/7043
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/30682fd1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/30682fd1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/30682fd1
Branch: refs/heads/master
Commit: 30682fd13ffc68ebaeb8200e7450f8317b89bf85
Parents: ec81bd9
Author: Will Berkeley <wd...@apache.org>
Authored: Thu Jun 1 10:18:15 2017 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Thu Jun 1 19:50:28 2017 +0000
----------------------------------------------------------------------
src/kudu/tools/ksck-test.cc | 36 ++++----
src/kudu/tools/ksck.cc | 144 ++++++++++-------------------
src/kudu/tools/tool_action_common.cc | 55 ++++++-----
src/kudu/tools/tool_action_common.h | 4 +-
src/kudu/tools/tool_action_master.cc | 4 +-
src/kudu/tools/tool_action_tserver.cc | 4 +-
6 files changed, 107 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index bdf1789..90a8f1d 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -406,12 +406,12 @@ TEST_F(KsckTest, TestConsensusConflictExtraPeer) {
ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
ASSERT_STR_CONTAINS(err_stream_.str(),
"The consensus matrix is:\n"
- " Host Voters Current term Config index Committed?\n"
- " ------------------- ---------------- ------------ ------------ ----------\n"
- " config from master: A* B C Yes \n"
- " A: A* B C D 0 Yes \n"
- " B: A* B C 0 Yes \n"
- " C: A* B C 0 Yes");
+ " Config source | Voters | Current term | Config index | Committed?\n"
+ "---------------+------------------+--------------+--------------+------------\n"
+ " master | A* B C | | | Yes\n"
+ " A | A* B C D | 0 | | Yes\n"
+ " B | A* B C | 0 | | Yes\n"
+ " C | A* B C | 0 | | Yes");
}
TEST_F(KsckTest, TestConsensusConflictMissingPeer) {
@@ -427,12 +427,12 @@ TEST_F(KsckTest, TestConsensusConflictMissingPeer) {
ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
ASSERT_STR_CONTAINS(err_stream_.str(),
"The consensus matrix is:\n"
- " Host Voters Current term Config index Committed?\n"
- " ------------------- ------------ ------------ ------------ ----------\n"
- " config from master: A* B C Yes \n"
- " A: A* B 0 Yes \n"
- " B: A* B C 0 Yes \n"
- " C: A* B C 0 Yes");
+ " Config source | Voters | Current term | Config index | Committed?\n"
+ "---------------+--------------+--------------+--------------+------------\n"
+ " master | A* B C | | | Yes\n"
+ " A | A* B | 0 | | Yes\n"
+ " B | A* B C | 0 | | Yes\n"
+ " C | A* B C | 0 | | Yes");
}
TEST_F(KsckTest, TestConsensusConflictDifferentLeader) {
@@ -448,12 +448,12 @@ TEST_F(KsckTest, TestConsensusConflictDifferentLeader) {
ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
ASSERT_STR_CONTAINS(err_stream_.str(),
"The consensus matrix is:\n"
- " Host Voters Current term Config index Committed?\n"
- " ------------------- ------------ ------------ ------------ ----------\n"
- " config from master: A* B C Yes \n"
- " A: A B* C 0 Yes \n"
- " B: A* B C 0 Yes \n"
- " C: A* B C 0 Yes");
+ " Config source | Voters | Current term | Config index | Committed?\n"
+ "---------------+--------------+--------------+--------------+------------\n"
+ " master | A* B C | | | Yes\n"
+ " A | A B* C | 0 | | Yes\n"
+ " B | A* B C | 0 | | Yes\n"
+ " C | A* B C | 0 | | Yes");
}
TEST_F(KsckTest, TestOneOneTabletBrokenTable) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 7361841..a99d4c1 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -23,6 +23,7 @@
#include <iostream>
#include <map>
#include <mutex>
+#include <sstream>
#include "kudu/consensus/quorum_util.h"
#include "kudu/gutil/map-util.h"
@@ -32,6 +33,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/tools/color.h"
+#include "kudu/tools/tool_action_common.h"
#include "kudu/util/atomic.h"
#include "kudu/util/blocking_queue.h"
#include "kudu/util/locks.h"
@@ -66,6 +68,7 @@ using std::right;
using std::setw;
using std::shared_ptr;
using std::string;
+using std::stringstream;
using std::unordered_map;
using strings::Substitute;
@@ -600,7 +603,7 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
namespace {
-// A struct in which to consolidate the state of each replica for easier analysis.
+// A struct consolidating the state of each replica, for easier analysis.
struct ReplicaInfo {
KsckTabletReplica* replica;
KsckTabletServer* ts = nullptr;
@@ -609,103 +612,22 @@ struct ReplicaInfo {
boost::optional<KsckConsensusState> consensus_state;
};
-// Formatting constants.
-const char col_pad[] = " ";
-const int peer_width = 4;
-
-// Print a cell of the consensus matrix.
-void PrintCell(const string& val, size_t width) {
- Out() << setw(width) << left << val;
-}
-
-// Print a row (one config) of the consensus matrix.
-void PrintConfig(const map<string, char>& peer_uuid_mapping,
- const string& name,
- const boost::optional<KsckConsensusState>& config,
- const vector<size_t>& column_widths) {
- Out() << col_pad << setw(column_widths[0]) << right << Substitute("$0:", name);
- Out() << col_pad;
- if (!config) {
- Out() << "[no config retrieved]" << endl;
- return;
- }
- for (const auto& entry : peer_uuid_mapping) {
- if (!ContainsKey(config->peer_uuids, entry.first)) {
- PrintCell("", peer_width);
+// Formats the peers known and unknown to 'config' using labels from 'peer_uuid_mapping'.
+string format_peers(const map<string, char>& peer_uuid_mapping, const KsckConsensusState& config) {
+ stringstream voters;
+ int peer_width = 4;
+ for (const auto &entry : peer_uuid_mapping) {
+ if (!ContainsKey(config.peer_uuids, entry.first)) {
+ voters << setw(peer_width) << left << "";
continue;
}
- if (config->leader_uuid && config->leader_uuid == entry.first) {
- PrintCell(Substitute("$0*", entry.second), peer_width);
+ if (config.leader_uuid && config.leader_uuid == entry.first) {
+ voters << setw(peer_width) << left << Substitute("$0*", entry.second);
} else {
- PrintCell(Substitute("$0", entry.second), peer_width);
- }
- }
-
- string term_str = "";
- if (config->term) {
- term_str = Substitute("$0", config->term.get());
- }
- Out() << col_pad << setw(column_widths[2]) << left << term_str;
-
- string opid_str = "";
- if (config->opid_index) {
- opid_str = Substitute("$0", config->opid_index.get());
- }
- Out() << col_pad << setw(column_widths[3]) << left << opid_str;
-
- string committed_str = "";
- switch (config->type) {
- case KsckConsensusConfigType::PENDING:
- committed_str = "No"; break;
- default:
- committed_str = "Yes"; break;
- }
- Out() << col_pad << setw(column_widths[4]) << left << committed_str;
- Out() << endl;
-}
-
-void PrintConsensusMatrix(const map<string, char>& peer_uuid_mapping,
- const KsckConsensusState& master_config,
- const vector<ReplicaInfo>& replica_infos) {
- const vector<string> columns{"Host", "Voters", "Current term", "Config index", "Committed?"};
- const string master_label = "config from master";
-
- // Compute the column widths.
- vector<size_t> column_widths{master_label.size() + 1, // + 1 for the :
- peer_width * peer_uuid_mapping.size(),
- columns[2].size(),
- columns[3].size(),
- columns[4].size(),
- };
-
- // Print the header rows.
- const string col_pad = " ";
- Out() << col_pad;
- for (int i = 0; i < columns.size(); i++) {
- Out() << left << setw(column_widths[i]) << columns[i];
- if (i < columns.size() - 1) {
- Out() << col_pad;
- }
- }
- Out() << endl;
- Out() << col_pad;
- for (int i = 0; i < column_widths.size(); i++) {
- Out() << string(column_widths[i], '-');
- if (i < column_widths.size() - 1) {
- Out() << col_pad;
+ voters << setw(peer_width) << left << Substitute("$0", entry.second);
}
}
- Out() << endl;
-
- // Print the master row, then the tserver rows.
- PrintConfig(peer_uuid_mapping, master_label, master_config, column_widths);
- for (const auto& replica : replica_infos) {
- char label = FindOrDie(peer_uuid_mapping, replica.ts->uuid());
- PrintConfig(peer_uuid_mapping,
- string(1, label),
- replica.consensus_state,
- column_widths);
- }
+ return voters.str();
}
} // anonymous namespace
@@ -907,8 +829,40 @@ Ksck::CheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int t
Out() << " " << entry.second << " = " << entry.first << endl;
}
Out() << endl;
- Out() << " The consensus matrix is:" << endl;
- PrintConsensusMatrix(peer_uuid_mapping, master_config, replica_infos);
+ Out() << "The consensus matrix is:" << endl;
+
+ // Prepare the header and columns for PrintTable.
+ const vector<string> headers{ "Config source", "Voters", "Current term",
+ "Config index", "Committed?" };
+
+ // Seed the columns with the master info.
+ vector<string> sources{"master"};
+ vector<string> voters{format_peers(peer_uuid_mapping, master_config)};
+ vector<string> terms{""};
+ vector<string> indexes{""};
+ vector<string> committed{"Yes"};
+
+ // Fill out the columns with info from the replicas.
+ for (const auto& replica : replica_infos) {
+ char label = FindOrDie(peer_uuid_mapping, replica.ts->uuid());
+ sources.push_back(string(1, label));
+ if (!replica.consensus_state) {
+ voters.push_back("[config not available]");
+ terms.push_back("");
+ indexes.push_back("");
+ committed.push_back("");
+ continue;
+ }
+ voters.push_back(format_peers(peer_uuid_mapping, replica.consensus_state.get()));
+ terms.push_back(replica.consensus_state->term ?
+ std::to_string(replica.consensus_state->term.get()) : "");
+ indexes.push_back(replica.consensus_state->opid_index ?
+ std::to_string(replica.consensus_state->opid_index.get()) : "");
+ committed.push_back(replica.consensus_state->type == KsckConsensusConfigType::PENDING ?
+ "No" : "Yes");
+ }
+ vector<vector<string>> columns{ sources, voters, terms, indexes, committed };
+ PrintTable(headers, columns, Out());
}
return result;
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 79ae3b6..89531a8 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -110,6 +110,7 @@ using server::SetFlagRequestPB;
using server::SetFlagResponsePB;
using std::cout;
using std::endl;
+using std::ostream;
using std::setfill;
using std::setw;
using std::shared_ptr;
@@ -381,7 +382,9 @@ namespace {
// dd23284d3a334f1a8306c19d89c1161f | 130.rack1.dc1.example.com:7050 | 1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30 | 136.rack1.dc1.example.com:7050 | 1492596696557549
// c108a85a68504c2bb9f49e4ee683d981 | 128.rack1.dc1.example.com:7050 | 1492596646623301
-void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+void PrettyPrintTable(const vector<string>& headers,
+ const vector<vector<string>>& columns,
+ ostream& out) {
CHECK_EQ(headers.size(), columns.size());
if (headers.empty()) return;
size_t num_columns = headers.size();
@@ -398,32 +401,32 @@ void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>
// Print the header row.
for (int col = 0; col < num_columns; col++) {
int padding = widths[col] - headers[col].size();
- cout << setw(padding / 2) << "" << " " << headers[col];
- if (col != num_columns - 1) cout << setw((padding + 1) / 2) << "" << " |";
+ out << setw(padding / 2) << "" << " " << headers[col];
+ if (col != num_columns - 1) out << setw((padding + 1) / 2) << "" << " |";
}
- cout << endl;
+ out << endl;
// Print the separator row.
- cout << setfill('-');
+ out << setfill('-');
for (int col = 0; col < num_columns; col++) {
- cout << setw(widths[col] + 2) << "";
- if (col != num_columns - 1) cout << "+";
+ out << setw(widths[col] + 2) << "";
+ if (col != num_columns - 1) out << "+";
}
- cout << endl;
+ out << endl;
// Print the data rows.
- cout << setfill(' ');
+ out << setfill(' ');
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
const auto& value = columns[col][row];
- cout << " " << value;
+ out << " " << value;
if (col != num_columns - 1) {
size_t padding = widths[col] - value.size();
- cout << setw(padding) << "" << " |";
+ out << setw(padding) << "" << " |";
}
}
- cout << endl;
+ out << endl;
}
}
@@ -431,7 +434,9 @@ void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>
//
// The table is formatted as an array of objects. Each object corresponds
// to a row whose fields are the column values.
-void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+void JsonPrintTable(const vector<string>& headers,
+ const vector<vector<string>>& columns,
+ ostream& out) {
std::ostringstream stream;
JsonWriter writer(&stream, JsonWriter::COMPACT);
@@ -449,7 +454,7 @@ void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>&
}
writer.EndArray();
- cout << stream.str() << endl;
+ out << stream.str() << endl;
}
// Print the table using the provided separator. For example, with a comma
@@ -460,32 +465,34 @@ void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>&
// dd23284d3a334f1a8306c19d89c1161f,130.rack1.dc1.example.com:7050,1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30,136.rack1.dc1.example.com:7050,1492596696557549
// c108a85a68504c2bb9f49e4ee683d981,128.rack1.dc1.example.com:7050,1492596646623301
-void PrintTable(const vector<vector<string>>& columns, const string& separator) {
+void PrintTable(const vector<vector<string>>& columns, const string& separator, ostream& out) {
// TODO(dan): proper escaping of string values.
int num_columns = columns.size();
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
- cout << columns[col][row];
- if (col != num_columns - 1) cout << separator;
+ out << columns[col][row];
+ if (col != num_columns - 1) out << separator;
}
- cout << endl;
+ out << endl;
}
}
} // anonymous namespace
-Status PrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+Status PrintTable(const vector<string>& headers,
+ const vector<vector<string>>& columns,
+ ostream& out) {
if (boost::iequals(FLAGS_format, "pretty")) {
- PrettyPrintTable(headers, columns);
+ PrettyPrintTable(headers, columns, out);
} else if (boost::iequals(FLAGS_format, "space")) {
- PrintTable(columns, " ");
+ PrintTable(columns, " ", out);
} else if (boost::iequals(FLAGS_format, "tsv")) {
- PrintTable(columns, " ");
+ PrintTable(columns, " ", out);
} else if (boost::iequals(FLAGS_format, "csv")) {
- PrintTable(columns, ",");
+ PrintTable(columns, ",", out);
} else if (boost::iequals(FLAGS_format, "json")) {
- JsonPrintTable(headers, columns);
+ JsonPrintTable(headers, columns, out);
} else {
return Status::InvalidArgument("unknown format (--format)", FLAGS_format);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index a0e39ef..439ba4f 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+#include <ostream>
#include <string>
#include <vector>
@@ -107,7 +108,8 @@ Status SetServerFlag(const std::string& address, uint16_t default_port,
// Prints a table.
Status PrintTable(const std::vector<std::string>& headers,
- const std::vector<std::vector<std::string>>& columns);
+ const std::vector<std::vector<std::string>>& columns,
+ std::ostream& out);
// Wrapper around a Kudu client which allows calling proxy methods on the leader
// master.
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 1dcce1d..b6d7704 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -17,6 +17,7 @@
#include "kudu/tools/tool_action.h"
+#include <iostream>
#include <memory>
#include <string>
#include <utility>
@@ -42,6 +43,7 @@ namespace kudu {
using master::ListMastersRequestPB;
using master::ListMastersResponsePB;
using master::MasterServiceProxy;
+using std::cout;
using std::string;
using std::unique_ptr;
@@ -138,7 +140,7 @@ Status ListMasters(const RunnerContext& context) {
columns.emplace_back(std::move(values));
}
- RETURN_NOT_OK(PrintTable(headers, columns));
+ RETURN_NOT_OK(PrintTable(headers, columns, cout));
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_tserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_tserver.cc b/src/kudu/tools/tool_action_tserver.cc
index 0352ae8..4a5efe0 100644
--- a/src/kudu/tools/tool_action_tserver.cc
+++ b/src/kudu/tools/tool_action_tserver.cc
@@ -18,6 +18,7 @@
#include "kudu/tools/tool_action.h"
#include <functional>
+#include <iostream>
#include <memory>
#include <string>
#include <utility>
@@ -38,6 +39,7 @@
DECLARE_string(columns);
+using std::cout;
using std::string;
using std::unique_ptr;
@@ -136,7 +138,7 @@ Status ListTServers(const RunnerContext& context) {
columns.emplace_back(std::move(values));
}
- RETURN_NOT_OK(PrintTable(headers, columns));
+ RETURN_NOT_OK(PrintTable(headers, columns, cout));
return Status::OK();
}
[3/3] kudu git commit: consensus: Add DCHECK(is_locked()) in all
unlocked methods
Posted by mp...@apache.org.
consensus: Add DCHECK(is_locked()) in all unlocked methods
Now that RaftConsensus owns 'lock_' we can easily implement these kinds
of basic hygenic checks.
Change-Id: I3364607e6778e6495e40d7ad36ea1850b022c444
Reviewed-on: http://gerrit.cloudera.org:8080/7013
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fbaa7dce
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fbaa7dce
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fbaa7dce
Branch: refs/heads/master
Commit: fbaa7dce968e2a6965effaa9d1097c34649d65f4
Parents: 3846861
Author: Mike Percy <mp...@apache.org>
Authored: Sat May 27 15:01:08 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 1 20:45:17 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/raft_consensus.cc | 42 +++++++++++++++++++++++++++++++
src/kudu/consensus/raft_consensus.h | 4 +--
2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fbaa7dce/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 6a2c39f..b7f5cdc 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -533,6 +533,8 @@ void RaftConsensus::ReportFailureDetected(const std::string& name, const Status&
}
Status RaftConsensus::BecomeLeaderUnlocked() {
+ DCHECK(lock_.is_locked());
+
TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
"peer", peer_uuid_,
"tablet", options_.tablet_id);
@@ -566,6 +568,8 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
}
Status RaftConsensus::BecomeReplicaUnlocked() {
+ DCHECK(lock_.is_locked());
+
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
<< ToStringUnlocked();
@@ -610,6 +614,8 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
}
Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
+ DCHECK(lock_.is_locked());
+
*round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
RETURN_NOT_OK(AddPendingOperationUnlocked(round));
@@ -621,6 +627,8 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
}
Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
+ DCHECK(lock_.is_locked());
+
// If we are adding a pending config change, we need to propagate it to the
// metadata.
if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
@@ -794,6 +802,8 @@ static bool IsConsensusOnlyOperation(OperationType op_type) {
}
Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg) {
+ DCHECK(lock_.is_locked());
+
if (IsConsensusOnlyOperation(msg->get()->op_type())) {
return StartConsensusOnlyRoundUnlocked(msg);
}
@@ -841,6 +851,8 @@ std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
LeaderRequest* deduplicated_req) {
+ DCHECK(lock_.is_locked());
+
// TODO(todd): use queue committed index?
int64_t last_committed_index = pending_.GetCommittedIndex();
@@ -903,6 +915,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response) {
+ DCHECK(lock_.is_locked());
// Do term checks first:
if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
@@ -928,6 +941,7 @@ Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB*
Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
ConsensusResponsePB* response) {
+ DCHECK(lock_.is_locked());
bool term_mismatch;
if (pending_.IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
@@ -967,6 +981,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
}
void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
+ DCHECK(lock_.is_locked());
pending_.AbortOpsAfter(truncate_after_index);
queue_->TruncateOpsAfter(truncate_after_index);
}
@@ -974,6 +989,7 @@ void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_inde
Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
LeaderRequest* deduped_req) {
+ DCHECK(lock_.is_locked());
if (request->has_deprecated_committed_index() ||
!request->has_all_replicated_index()) {
@@ -1373,6 +1389,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
}
void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
+ DCHECK(lock_.is_locked());
TRACE("Filling consensus response to leader.");
response->set_responder_term(GetCurrentTermUnlocked());
response->mutable_status()->mutable_last_received()->CopyFrom(
@@ -1796,6 +1813,7 @@ OpId RaftConsensus::GetLatestOpIdFromLog() {
}
Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
+ DCHECK(lock_.is_locked());
OperationType op_type = msg->get()->op_type();
CHECK(IsConsensusOnlyOperation(op_type))
<< "Expected a consensus-only op type, got " << OperationType_Name(op_type)
@@ -1821,6 +1839,7 @@ Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
}
std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
+ DCHECK(lock_.is_locked());
return Substitute("$0Leader $1election vote request",
LogPrefixUnlocked(),
request.is_pre_election() ? "pre-" : "");
@@ -1981,6 +2000,7 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
const RaftConfigPB& new_config,
const StatusCallback& client_cb) {
+ DCHECK(lock_.is_locked());
auto cc_replicate = new ReplicateMsg();
cc_replicate->set_op_type(CHANGE_CONFIG_OP);
ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
@@ -2001,6 +2021,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
}
Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
+ DCHECK(lock_.is_locked());
DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
const RaftConfigPB& active_config = GetActiveConfigUnlocked();
@@ -2264,6 +2285,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
}
void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
+ DCHECK(lock_.is_locked());
const OpId& op_id = round->replicate_msg()->id();
if (!status.ok()) {
@@ -2322,6 +2344,7 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
Status RaftConsensus::EnsureFailureDetectorEnabledUnlocked() {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
return Status::OK();
}
@@ -2335,6 +2358,7 @@ Status RaftConsensus::EnsureFailureDetectorEnabledUnlocked() {
}
Status RaftConsensus::EnsureFailureDetectorDisabledUnlocked() {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
return Status::OK();
}
@@ -2346,6 +2370,7 @@ Status RaftConsensus::EnsureFailureDetectorDisabledUnlocked() {
}
Status RaftConsensus::ExpireFailureDetectorUnlocked() {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
return Status::OK();
}
@@ -2354,11 +2379,13 @@ Status RaftConsensus::ExpireFailureDetectorUnlocked() {
}
Status RaftConsensus::SnoozeFailureDetectorUnlocked() {
+ DCHECK(lock_.is_locked());
return SnoozeFailureDetectorUnlocked(MonoDelta::FromMicroseconds(0), DO_NOT_LOG);
}
Status RaftConsensus::SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
AllowLogging allow_logging) {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
return Status::OK();
}
@@ -2380,6 +2407,7 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const {
}
MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
+ DCHECK(lock_.is_locked());
// Compute a backoff factor based on how many leader elections have
// failed since a stable leader was last seen.
double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1);
@@ -2400,6 +2428,7 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
FlushToDisk flush) {
+ DCHECK(lock_.is_locked());
if (new_term <= GetCurrentTermUnlocked()) {
return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
new_term, GetCurrentTermUnlocked()));
@@ -2434,6 +2463,7 @@ Status RaftConsensus::CheckRunningUnlocked() const {
}
Status RaftConsensus::CheckActiveLeaderUnlocked() const {
+ DCHECK(lock_.is_locked());
RaftPeerPB::Role role = GetActiveRoleUnlocked();
switch (role) {
case RaftPeerPB::LEADER:
@@ -2449,6 +2479,7 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
}
ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const {
+ DCHECK(lock_.is_locked());
return cmeta_->ToConsensusStatePB();
}
@@ -2494,6 +2525,7 @@ Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
}
void RaftConsensus::ClearPendingConfigUnlocked() {
+ DCHECK(lock_.is_locked());
cmeta_->clear_pending_config();
}
@@ -2573,6 +2605,16 @@ const string& RaftConsensus::GetLeaderUuidUnlocked() const {
return cmeta_->leader_uuid();
}
+bool RaftConsensus::HasLeaderUnlocked() const {
+ DCHECK(lock_.is_locked());
+ return !GetLeaderUuidUnlocked().empty();
+}
+
+void RaftConsensus::ClearLeaderUnlocked() {
+ DCHECK(lock_.is_locked());
+ SetLeaderUuidUnlocked("");
+}
+
const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
DCHECK(lock_.is_locked());
return cmeta_->has_voted_for();
http://git-wip-us.apache.org/repos/asf/kudu/blob/fbaa7dce/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b9348c2..f00a8c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -528,8 +528,8 @@ class RaftConsensus : public Consensus,
// Accessors for the leader of the current term.
const std::string& GetLeaderUuidUnlocked() const;
- bool HasLeaderUnlocked() const { return !GetLeaderUuidUnlocked().empty(); }
- void ClearLeaderUnlocked() { SetLeaderUuidUnlocked(""); }
+ bool HasLeaderUnlocked() const;
+ void ClearLeaderUnlocked();
// Return whether this peer has voted in the current term.
const bool HasVotedCurrentTermUnlocked() const;
[2/3] kudu git commit: consensus: Get rid of LockFor*() methods
Posted by mp...@apache.org.
consensus: Get rid of LockFor*() methods
Simplify the locking logic by removing layers of abstraction.
Also add State_Name() helper for state-related error messages.
Change-Id: I6858752f4fbeb70b09eb4375c52e4aeaa1bb8e71
Reviewed-on: http://gerrit.cloudera.org:8080/7012
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3846861a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3846861a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3846861a
Branch: refs/heads/master
Commit: 3846861ab258a0ac0497893865875b2138964fe3
Parents: 30682fd
Author: Mike Percy <mp...@apache.org>
Authored: Tue May 30 14:00:53 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 1 20:44:45 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/raft_consensus.cc | 298 +++++++++----------
src/kudu/consensus/raft_consensus.h | 68 ++---
.../consensus/raft_consensus_quorum-test.cc | 9 +-
3 files changed, 169 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c5370c7..6a2c39f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -278,8 +278,11 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
failure_detector_));
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForStart(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
+ << State_Name(state_);
+
ClearLeaderUnlocked();
// Our last persisted term can be higher than the last persisted operation
@@ -310,8 +313,9 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
@@ -350,15 +354,19 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
}
bool RaftConsensus::IsRunning() const {
- UniqueLock lock;
- Status s = LockForRead(&lock);
- if (PREDICT_FALSE(!s.ok())) return false;
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return state_ == kRunning;
}
Status RaftConsensus::EmulateElection() {
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
@@ -404,8 +412,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
"mode", mode_str);
scoped_refptr<LeaderElection> election;
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
if (active_role == RaftPeerPB::LEADER) {
@@ -413,7 +422,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
return Status::OK();
}
if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
- SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
+ RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this state.
return Status::IllegalState("Not starting election: Node is currently "
"a non-participant in the raft config",
SecureShortDebugString(GetActiveConfigUnlocked()));
@@ -499,8 +508,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
StatusToPB(Status::IllegalState("Not currently leader"),
@@ -580,8 +590,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
std::lock_guard<simple_spinlock> lock(update_lock_);
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
}
@@ -591,8 +602,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
}
Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
round->BindToTerm(GetCurrentTermUnlocked());
return Status::OK();
}
@@ -655,12 +667,19 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
}
void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
- UniqueLock lock;
- Status s = LockForCommit(&lock);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe()
- << "Unable to take state lock to update committed index: "
- << s.ToString();
+ TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
+ "tablet", options_.tablet_id,
+ "commit_index", commit_index);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ // We will process commit notifications while shutting down because a replica
+ // which has initiated a Prepare() / Replicate() may eventually commit even if
+ // its state has changed after the initial Append() / Update().
+ if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+ LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
+ << "Replica not in running state: "
+ << State_Name(state_);
return;
}
@@ -672,11 +691,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
}
void RaftConsensus::NotifyTermChange(int64_t term) {
- UniqueLock lock;
- Status s = LockForConfigChange(&lock);
+ TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
+ "tablet", options_.tablet_id,
+ "term", term);
+
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change"
- << " when notified of new term " << term << ": " << s.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term "
+ << "(" << term << "): " << s.ToString();
return;
}
WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
@@ -697,14 +721,8 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
RaftConfigPB committed_config;
{
- UniqueLock lock;
- Status s = LockForRead(&lock);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
- << "Unable to lock consensus for read: " << s.ToString();
- return;
- }
-
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
int64_t current_term = GetCurrentTermUnlocked();
if (current_term != term) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
@@ -794,8 +812,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
const RaftConfigPB& config = GetCommittedConfigUnlocked();
const string& uuid = peer_uuid_;
if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
@@ -1142,8 +1160,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
LeaderRequest deduped_req;
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForUpdate(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
+ if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
+ }
deduped_req.leader_uuid = request->caller_uuid();
@@ -1333,9 +1355,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
// If just waiting for our log append to finish lets snooze the timer.
// We don't want to fire leader election because we're waiting on our own log.
if (s.IsTimedOut()) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
- SnoozeFailureDetectorUnlocked();
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(SnoozeFailureDetectorUnlocked());
}
} while (s.IsTimedOut());
RETURN_NOT_OK(s);
@@ -1393,14 +1415,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// timeouts, just vote a quick NO.
//
// We still need to take the state lock in order to respond with term info, etc.
- UniqueLock state_guard;
- RETURN_NOT_OK(LockForConfigChange(&state_guard));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
return RequestVoteRespondIsBusy(request, response);
}
// Acquire the replica state lock so we can read / modify the consensus state.
- UniqueLock state_guard;
- RETURN_NOT_OK(LockForConfigChange(&state_guard));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
// If the node is not in the configuration, allow the vote (this is required by Raft)
// but log an informational message anyway.
@@ -1480,6 +1504,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
const StatusCallback& client_cb,
boost::optional<TabletServerErrorPB::Code>* error_code) {
+ TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
if (PREDICT_FALSE(!req.has_type())) {
return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()",
SecureShortDebugString(req));
@@ -1492,8 +1520,9 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
ChangeConfigType type = req.type();
const RaftPeerPB& server = req.server();
{
- UniqueLock lock;
- RETURN_NOT_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ RETURN_NOT_OK(CheckRunningUnlocked());
RETURN_NOT_OK(CheckActiveLeaderUnlocked());
RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
@@ -1599,8 +1628,8 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
{
// Take the snapshot of the replica state and queue state so that
// we can stick them in the consensus update request later.
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
current_term = GetCurrentTermUnlocked();
committed_config = GetCommittedConfigUnlocked();
if (IsConfigChangePendingUnlocked()) {
@@ -1719,6 +1748,10 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
}
void RaftConsensus::Shutdown() {
+ TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
+ "peer", peer_uuid_,
+ "tablet", options_.tablet_id);
+
// Avoid taking locks if already shut down so we don't violate
// ThreadRestrictions assertions in the case where the RaftConsensus
// destructor runs on the reactor thread due to an election callback being
@@ -1726,9 +1759,11 @@ void RaftConsensus::Shutdown() {
if (shutdown_.Load(kMemOrderAcquire)) return;
{
- UniqueLock lock;
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
// Transition to kShuttingDown state.
- CHECK_OK(LockForShutdown(&lock));
+ CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
+ state_ = kShuttingDown;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
}
@@ -1739,10 +1774,10 @@ void RaftConsensus::Shutdown() {
queue_->Close();
{
- UniqueLock lock;
- CHECK_OK(LockForShutdown(&lock));
- CHECK_EQ(kShuttingDown, state_);
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
CHECK_OK(pending_.CancelPendingTransactions());
+ CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
state_ = kShutDown;
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
}
@@ -1779,8 +1814,9 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
}
Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
- UniqueLock lock;
- CHECK_OK(LockForConfigChange(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
+ CHECK_OK(CheckRunningUnlocked());
return HandleTermAdvanceUnlocked(new_term);
}
@@ -1914,11 +1950,27 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
}
RaftPeerPB::Role RaftConsensus::role() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return GetActiveRoleUnlocked();
}
+const char* RaftConsensus::State_Name(State state) {
+ switch (state) {
+ case kInitialized:
+ return "Initialized";
+ case kRunning:
+ return "Running";
+ case kShuttingDown:
+ return "Shutting down";
+ case kShutDown:
+ return "Shut down";
+ default:
+ LOG(DFATAL) << "Unknown State value: " << state;
+ return "Unknown";
+ }
+}
+
void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
DCHECK(lock_.is_locked());
failed_elections_since_stable_leader_ = 0;
@@ -1975,14 +2027,14 @@ const string& RaftConsensus::tablet_id() const {
}
ConsensusStatePB RaftConsensus::ConsensusState() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return ConsensusStateUnlocked();
}
RaftConfigPB RaftConsensus::CommittedConfig() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return GetCommittedConfigUnlocked();
}
@@ -1997,8 +2049,8 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
// Dump the queues on a leader.
RaftPeerPB::Role role;
{
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
role = GetActiveRoleUnlocked();
}
if (role == RaftPeerPB::LEADER) {
@@ -2026,8 +2078,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
// Snooze to avoid the election timer firing again as much as possible.
{
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
// We need to snooze when we win and when we lose:
// - When we win because we're about to disable the timer and become leader.
// - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2064,12 +2116,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
// The vote was granted, become leader.
- UniqueLock lock;
- Status s = LockForConfigChange(&lock);
+ ThreadRestrictions::AssertWaitAllowed();
+ UniqueLock lock(lock_);
+ Status s = CheckRunningUnlocked();
if (PREDICT_FALSE(!s.ok())) {
- LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term "
- << election_term << " while not running: "
- << s.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term "
+ << election_term << " while not running: "
+ << s.ToString();
return;
}
@@ -2137,8 +2190,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
- UniqueLock lock;
- RETURN_NOT_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
if (type == RECEIVED_OPID) {
*DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
} else if (type == COMMITTED_OPID) {
@@ -2192,13 +2245,13 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
// the client callback.
if (!status.ok()) {
- LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: "
- << status.ToString();
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
+ << status.ToString();
client_cb.Run(status);
return;
}
- VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
- << round->id();
+ VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
+ << round->id();
gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
commit_msg->set_op_type(round->replicate_msg()->op_type());
*commit_msg->mutable_commited_op_id() = round->id();
@@ -2364,82 +2417,19 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
return Status::OK();
}
-Status RaftConsensus::LockForStart(UniqueLock* lock) const {
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
- << " Replica is not in kInitialized state";
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForRead(UniqueLock* lock) const {
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
- ThreadRestrictions::AssertWaitAllowed();
+Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
+ DCHECK(lock_.is_locked());
DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Replica not in running state");
- }
-
- RETURN_NOT_OK(CheckActiveLeaderUnlocked());
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
- return Status::IllegalState("Replica not in running state");
- }
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Replica not in running state");
- }
- if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
- LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
- }
- lock->swap(l);
- return Status::OK();
+ RETURN_NOT_OK(CheckRunningUnlocked());
+ return CheckActiveLeaderUnlocked();
}
-Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
-
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- // Can only change the config on running replicas.
+Status RaftConsensus::CheckRunningUnlocked() const {
+ DCHECK(lock_.is_locked());
if (PREDICT_FALSE(state_ != kRunning)) {
- return Status::IllegalState("Unable to lock ReplicaState for config change",
- Substitute("State = $0", state_));
+ return Status::IllegalState("RaftConsensus is not running",
+ Substitute("State = $0", State_Name(state_)));
}
- lock->swap(l);
- return Status::OK();
-}
-
-Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
- TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
- ThreadRestrictions::AssertWaitAllowed();
- UniqueLock l(lock_);
- if (state_ != kShuttingDown && state_ != kShutDown) {
- state_ = kShuttingDown;
- }
- lock->swap(l);
return Status::OK();
}
@@ -2608,8 +2598,8 @@ const ConsensusOptions& RaftConsensus::GetOptions() const {
}
string RaftConsensus::LogPrefix() const {
- UniqueLock lock;
- CHECK_OK(LockForRead(&lock));
+ ThreadRestrictions::AssertWaitAllowed();
+ LockGuard l(lock_);
return LogPrefixUnlocked();
}
@@ -2630,14 +2620,14 @@ string RaftConsensus::LogPrefixThreadSafe() const {
string RaftConsensus::ToString() const {
ThreadRestrictions::AssertWaitAllowed();
- UniqueLock lock(lock_);
+ LockGuard l(lock_);
return ToStringUnlocked();
}
string RaftConsensus::ToStringUnlocked() const {
DCHECK(lock_.is_locked());
return Substitute("Replica: $0, State: $1, Role: $2",
- peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+ peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
}
ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b1badde..b9348c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -35,9 +35,6 @@
namespace kudu {
-typedef std::lock_guard<simple_spinlock> Lock;
-typedef gscoped_ptr<Lock> ScopedLock;
-
class Counter;
class FailureDetector;
class HostPort;
@@ -62,8 +59,6 @@ struct ElectionResult;
class RaftConsensus : public Consensus,
public PeerMessageQueueObserver {
public:
- typedef std::unique_lock<simple_spinlock> UniqueLock;
-
static scoped_refptr<RaftConsensus> Create(
ConsensusOptions options,
std::unique_ptr<ConsensusMetadata> cmeta,
@@ -181,6 +176,8 @@ class RaftConsensus : public Consensus,
FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
+ // NOTE: When adding / changing values in this enum, add the corresponding
+ // values to State_Name().
enum State {
// State after the replica is built.
kInitialized,
@@ -224,6 +221,12 @@ class RaftConsensus : public Consensus,
std::string OpsRangeString() const;
};
+ using LockGuard = std::lock_guard<simple_spinlock>;
+ using UniqueLock = std::unique_lock<simple_spinlock>;
+
+ // Returns string description for State enum value.
+ static const char* State_Name(State state);
+
// Set the leader UUID of the configuration and mark the tablet config dirty for
// reporting to the master.
void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -276,7 +279,8 @@ class RaftConsensus : public Consensus,
// pending operations, we proactively abort those pending operations after and including
// the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache.
Status EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
- ConsensusResponsePB* response);
+ ConsensusResponsePB* response)
+ WARN_UNUSED_RESULT;
// Check a request received from a leader, making sure:
// - The request is in the right term
@@ -288,7 +292,7 @@ class RaftConsensus : public Consensus,
// the messages to add to our state machine.
Status CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
ConsensusResponsePB* response,
- LeaderRequest* deduped_req);
+ LeaderRequest* deduped_req) WARN_UNUSED_RESULT;
// Abort any pending operations after the given op index,
// and also truncate the LogCache accordingly.
@@ -382,13 +386,13 @@ class RaftConsensus : public Consensus,
// When this is called a failure is guaranteed not to be detected
// before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
// 'FLAGS_raft_heartbeat_interval_ms' has elapsed.
- Status SnoozeFailureDetectorUnlocked();
+ Status SnoozeFailureDetectorUnlocked() WARN_UNUSED_RESULT;
// Like the above but adds 'additional_delta' to the default timeout
// period. If allow_logging is set to ALLOW_LOGGING, then this method
// will print a log message when called.
Status SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
- AllowLogging allow_logging);
+ AllowLogging allow_logging) WARN_UNUSED_RESULT;
// Return the minimum election timeout. Due to backoff and random
// jitter, election timeouts may be longer than this.
@@ -462,45 +466,17 @@ class RaftConsensus : public Consensus,
// (see Diego Ongaro's thesis section 4.1).
Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
- // Locks a replica in preparation for StartUnlocked(). Makes
- // sure the replica is in kInitialized state.
- Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Obtains the lock for a state read, does not check state.
- Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until the critical section of an append completes,
- // i.e. until the replicate message has been assigned an id and placed in
- // the log queue.
- // This also checks that the replica is in the appropriate
- // state (role) to replicate the provided operation, that the operation
- // contains a replicate message and is of the appropriate type, and returns
- // Status::IllegalState if that is not the case.
- Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until the critical section of a commit completes.
- // This succeeds for all states since a replica which has initiated
- // a Prepare()/Replicate() must eventually commit even if it's state
- // has changed after the initial Append()/Update().
- Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Locks a replica down until an the critical section of an update completes.
- // Further updates from the same or some other leader will be blocked until
- // this completes. This also checks that the replica is in the appropriate
- // state (role) to be updated and returns Status::IllegalState if that
- // is not the case.
- Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
- // Changes the role to non-participant and returns a lock that can be
- // used to make sure no state updates come in until Shutdown() is
- // completed.
- Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+ // Checks that the replica is in the appropriate state and role to replicate
+ // the provided operation and that the replicate message does not yet have an
+ // OpId assigned.
+ Status CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+ // Return Status::IllegalState if 'state_' != kRunning, OK otherwise.
+ Status CheckRunningUnlocked() const WARN_UNUSED_RESULT;
// Ensure the local peer is the active leader.
// Returns OK if leader, IllegalState otherwise.
- Status CheckActiveLeaderUnlocked() const;
+ Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
// Return current consensus state summary.
ConsensusStatePB ConsensusStateUnlocked() const;
@@ -515,7 +491,7 @@ class RaftConsensus : public Consensus,
// Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
// currently *no* configuration change pending, and IllegalState is there *is* a
// configuration change pending.
- Status CheckNoConfigChangePendingUnlocked() const;
+ Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT;
// Sets the given configuration as pending commit. Does not persist into the peers
// metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index bb1575e..a0933c8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -660,8 +660,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- RaftConsensus::UniqueLock lock;
- ASSERT_OK(follower0->LockForRead(&lock));
+ RaftConsensus::LockGuard l(follower0->lock_);
// If the locked replica would stop consensus we would hang here
// as we wait for operations to be replicated to a majority.
@@ -703,13 +702,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
// and never letting them go.
scoped_refptr<RaftConsensus> follower0;
CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
- RaftConsensus::UniqueLock lock0;
- ASSERT_OK(follower0->LockForRead(&lock0));
+ RaftConsensus::LockGuard l_0(follower0->lock_);
scoped_refptr<RaftConsensus> follower1;
CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
- RaftConsensus::UniqueLock lock1;
- ASSERT_OK(follower1->LockForRead(&lock1));
+ RaftConsensus::LockGuard l_1(follower1->lock_);
// Append a single message to the queue
ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));