You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/06/01 20:50:59 UTC

[1/3] kudu git commit: [tools] Use PrintTable to format ksck's consensus matrix

Repository: kudu
Updated Branches:
  refs/heads/master ec81bd9d2 -> fbaa7dce9


[tools] Use PrintTable to format ksck's consensus matrix

The first version of ksck's consensus matrix used its own table
code, but actually there was already nice table code available
in tool_action_common.h. This switches ksck to use it. It also
generalizes the table code to output to a generic ostream
instead of only cout; this was necessary for ksck-test but might
be useful in other ways later.

Change-Id: I8d77005f20091e6778702580e3a269d9689c5b0a
Reviewed-on: http://gerrit.cloudera.org:8080/7043
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/30682fd1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/30682fd1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/30682fd1

Branch: refs/heads/master
Commit: 30682fd13ffc68ebaeb8200e7450f8317b89bf85
Parents: ec81bd9
Author: Will Berkeley <wd...@apache.org>
Authored: Thu Jun 1 10:18:15 2017 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Thu Jun 1 19:50:28 2017 +0000

----------------------------------------------------------------------
 src/kudu/tools/ksck-test.cc           |  36 ++++----
 src/kudu/tools/ksck.cc                | 144 ++++++++++-------------------
 src/kudu/tools/tool_action_common.cc  |  55 ++++++-----
 src/kudu/tools/tool_action_common.h   |   4 +-
 src/kudu/tools/tool_action_master.cc  |   4 +-
 src/kudu/tools/tool_action_tserver.cc |   4 +-
 6 files changed, 107 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/ksck-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck-test.cc b/src/kudu/tools/ksck-test.cc
index bdf1789..90a8f1d 100644
--- a/src/kudu/tools/ksck-test.cc
+++ b/src/kudu/tools/ksck-test.cc
@@ -406,12 +406,12 @@ TEST_F(KsckTest, TestConsensusConflictExtraPeer) {
   ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
   ASSERT_STR_CONTAINS(err_stream_.str(),
       "The consensus matrix is:\n"
-      "  Host                 Voters            Current term  Config index  Committed?\n"
-      "  -------------------  ----------------  ------------  ------------  ----------\n"
-      "  config from master:  A*  B   C                                     Yes       \n"
-      "                   A:  A*  B   C   D     0                           Yes       \n"
-      "                   B:  A*  B   C         0                           Yes       \n"
-      "                   C:  A*  B   C         0                           Yes");
+      " Config source |      Voters      | Current term | Config index | Committed?\n"
+      "---------------+------------------+--------------+--------------+------------\n"
+      " master        | A*  B   C        |              |              | Yes\n"
+      " A             | A*  B   C   D    | 0            |              | Yes\n"
+      " B             | A*  B   C        | 0            |              | Yes\n"
+      " C             | A*  B   C        | 0            |              | Yes");
 }
 
 TEST_F(KsckTest, TestConsensusConflictMissingPeer) {
@@ -427,12 +427,12 @@ TEST_F(KsckTest, TestConsensusConflictMissingPeer) {
   ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
   ASSERT_STR_CONTAINS(err_stream_.str(),
       "The consensus matrix is:\n"
-      "  Host                 Voters        Current term  Config index  Committed?\n"
-      "  -------------------  ------------  ------------  ------------  ----------\n"
-      "  config from master:  A*  B   C                                 Yes       \n"
-      "                   A:  A*  B         0                           Yes       \n"
-      "                   B:  A*  B   C     0                           Yes       \n"
-      "                   C:  A*  B   C     0                           Yes");
+      " Config source |    Voters    | Current term | Config index | Committed?\n"
+      "---------------+--------------+--------------+--------------+------------\n"
+      " master        | A*  B   C    |              |              | Yes\n"
+      " A             | A*  B        | 0            |              | Yes\n"
+      " B             | A*  B   C    | 0            |              | Yes\n"
+      " C             | A*  B   C    | 0            |              | Yes");
 }
 
 TEST_F(KsckTest, TestConsensusConflictDifferentLeader) {
@@ -448,12 +448,12 @@ TEST_F(KsckTest, TestConsensusConflictDifferentLeader) {
   ASSERT_EQ("Corruption: 1 table(s) are bad", s.ToString());
   ASSERT_STR_CONTAINS(err_stream_.str(),
       "The consensus matrix is:\n"
-      "  Host                 Voters        Current term  Config index  Committed?\n"
-      "  -------------------  ------------  ------------  ------------  ----------\n"
-      "  config from master:  A*  B   C                                 Yes       \n"
-      "                   A:  A   B*  C     0                           Yes       \n"
-      "                   B:  A*  B   C     0                           Yes       \n"
-      "                   C:  A*  B   C     0                           Yes");
+      " Config source |    Voters    | Current term | Config index | Committed?\n"
+      "---------------+--------------+--------------+--------------+------------\n"
+      " master        | A*  B   C    |              |              | Yes\n"
+      " A             | A   B*  C    | 0            |              | Yes\n"
+      " B             | A*  B   C    | 0            |              | Yes\n"
+      " C             | A*  B   C    | 0            |              | Yes");
 }
 
 TEST_F(KsckTest, TestOneOneTabletBrokenTable) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/ksck.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/ksck.cc b/src/kudu/tools/ksck.cc
index 7361841..a99d4c1 100644
--- a/src/kudu/tools/ksck.cc
+++ b/src/kudu/tools/ksck.cc
@@ -23,6 +23,7 @@
 #include <iostream>
 #include <map>
 #include <mutex>
+#include <sstream>
 
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/map-util.h"
@@ -32,6 +33,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/tools/color.h"
+#include "kudu/tools/tool_action_common.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/locks.h"
@@ -66,6 +68,7 @@ using std::right;
 using std::setw;
 using std::shared_ptr;
 using std::string;
+using std::stringstream;
 using std::unordered_map;
 using strings::Substitute;
 
@@ -600,7 +603,7 @@ bool Ksck::VerifyTable(const shared_ptr<KsckTable>& table) {
 
 namespace {
 
-// A struct in which to consolidate the state of each replica for easier analysis.
+// A struct consolidating the state of each replica, for easier analysis.
 struct ReplicaInfo {
   KsckTabletReplica* replica;
   KsckTabletServer* ts = nullptr;
@@ -609,103 +612,22 @@ struct ReplicaInfo {
   boost::optional<KsckConsensusState> consensus_state;
 };
 
-// Formatting constants.
-const char col_pad[] = "  ";
-const int peer_width = 4;
-
-// Print a cell of the consensus matrix.
-void PrintCell(const string& val, size_t width) {
-  Out() << setw(width) << left << val;
-}
-
-// Print a row (one config) of the consensus matrix.
-void PrintConfig(const map<string, char>& peer_uuid_mapping,
-                 const string& name,
-                 const boost::optional<KsckConsensusState>& config,
-                 const vector<size_t>& column_widths) {
-  Out() << col_pad << setw(column_widths[0]) << right << Substitute("$0:", name);
-  Out() << col_pad;
-  if (!config) {
-    Out() << "[no config retrieved]" << endl;
-    return;
-  }
-  for (const auto& entry : peer_uuid_mapping) {
-    if (!ContainsKey(config->peer_uuids, entry.first)) {
-      PrintCell("", peer_width);
+// Formats the peers known and unknown to 'config' using labels from 'peer_uuid_mapping'.
+string format_peers(const map<string, char>& peer_uuid_mapping, const KsckConsensusState& config) {
+  stringstream voters;
+  int peer_width = 4;
+  for (const auto &entry : peer_uuid_mapping) {
+    if (!ContainsKey(config.peer_uuids, entry.first)) {
+      voters << setw(peer_width) << left << "";
       continue;
     }
-    if (config->leader_uuid && config->leader_uuid == entry.first) {
-      PrintCell(Substitute("$0*", entry.second), peer_width);
+    if (config.leader_uuid && config.leader_uuid == entry.first) {
+      voters << setw(peer_width) << left << Substitute("$0*", entry.second);
     } else {
-      PrintCell(Substitute("$0", entry.second), peer_width);
-    }
-  }
-
-  string term_str = "";
-  if (config->term) {
-    term_str = Substitute("$0", config->term.get());
-  }
-  Out() << col_pad << setw(column_widths[2]) << left << term_str;
-
-  string opid_str = "";
-  if (config->opid_index) {
-    opid_str = Substitute("$0", config->opid_index.get());
-  }
-  Out() << col_pad << setw(column_widths[3]) << left << opid_str;
-
-  string committed_str = "";
-  switch (config->type) {
-    case KsckConsensusConfigType::PENDING:
-      committed_str = "No"; break;
-    default:
-      committed_str = "Yes"; break;
-  }
-  Out() << col_pad << setw(column_widths[4]) << left << committed_str;
-  Out() << endl;
-}
-
-void PrintConsensusMatrix(const map<string, char>& peer_uuid_mapping,
-                          const KsckConsensusState& master_config,
-                          const vector<ReplicaInfo>& replica_infos) {
-  const vector<string> columns{"Host", "Voters", "Current term", "Config index", "Committed?"};
-  const string master_label = "config from master";
-
-  // Compute the column widths.
-  vector<size_t> column_widths{master_label.size() + 1, // + 1 for the :
-                               peer_width * peer_uuid_mapping.size(),
-                               columns[2].size(),
-                               columns[3].size(),
-                               columns[4].size(),
-  };
-
-  // Print the header rows.
-  const string col_pad = "  ";
-  Out() << col_pad;
-  for (int i = 0; i < columns.size(); i++) {
-    Out() << left << setw(column_widths[i]) << columns[i];
-    if (i < columns.size() - 1) {
-      Out() << col_pad;
-    }
-  }
-  Out() << endl;
-  Out() << col_pad;
-  for (int i = 0; i < column_widths.size(); i++) {
-    Out() << string(column_widths[i], '-');
-    if (i < column_widths.size() - 1) {
-      Out() << col_pad;
+      voters << setw(peer_width) << left << Substitute("$0", entry.second);
     }
   }
-  Out() << endl;
-
-  // Print the master row, then the tserver rows.
-  PrintConfig(peer_uuid_mapping, master_label, master_config, column_widths);
-  for (const auto& replica : replica_infos) {
-    char label = FindOrDie(peer_uuid_mapping, replica.ts->uuid());
-    PrintConfig(peer_uuid_mapping,
-                string(1, label),
-                replica.consensus_state,
-                column_widths);
-  }
+  return voters.str();
 }
 
 } // anonymous namespace
@@ -907,8 +829,40 @@ Ksck::CheckResult Ksck::VerifyTablet(const shared_ptr<KsckTablet>& tablet, int t
       Out() << "  " << entry.second << " = " << entry.first << endl;
     }
     Out() << endl;
-    Out() << "  The consensus matrix is:" << endl;
-    PrintConsensusMatrix(peer_uuid_mapping, master_config, replica_infos);
+    Out() << "The consensus matrix is:" << endl;
+
+    // Prepare the header and columns for PrintTable.
+    const vector<string> headers{ "Config source", "Voters", "Current term",
+                                  "Config index", "Committed?" };
+
+    // Seed the columns with the master info.
+    vector<string> sources{"master"};
+    vector<string> voters{format_peers(peer_uuid_mapping, master_config)};
+    vector<string> terms{""};
+    vector<string> indexes{""};
+    vector<string> committed{"Yes"};
+
+    // Fill out the columns with info from the replicas.
+    for (const auto& replica : replica_infos) {
+      char label = FindOrDie(peer_uuid_mapping, replica.ts->uuid());
+      sources.push_back(string(1, label));
+      if (!replica.consensus_state) {
+        voters.push_back("[config not available]");
+        terms.push_back("");
+        indexes.push_back("");
+        committed.push_back("");
+        continue;
+      }
+      voters.push_back(format_peers(peer_uuid_mapping, replica.consensus_state.get()));
+      terms.push_back(replica.consensus_state->term ?
+                      std::to_string(replica.consensus_state->term.get()) : "");
+      indexes.push_back(replica.consensus_state->opid_index ?
+                        std::to_string(replica.consensus_state->opid_index.get()) : "");
+      committed.push_back(replica.consensus_state->type == KsckConsensusConfigType::PENDING ?
+                          "No" : "Yes");
+    }
+    vector<vector<string>> columns{ sources, voters, terms, indexes, committed };
+    PrintTable(headers, columns, Out());
   }
 
   return result;

http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 79ae3b6..89531a8 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -110,6 +110,7 @@ using server::SetFlagRequestPB;
 using server::SetFlagResponsePB;
 using std::cout;
 using std::endl;
+using std::ostream;
 using std::setfill;
 using std::setw;
 using std::shared_ptr;
@@ -381,7 +382,9 @@ namespace {
 //  dd23284d3a334f1a8306c19d89c1161f | 130.rack1.dc1.example.com:7050 | 1492596704536543
 //  d8009e07d82b4e66a7ab50f85e60bc30 | 136.rack1.dc1.example.com:7050 | 1492596696557549
 //  c108a85a68504c2bb9f49e4ee683d981 | 128.rack1.dc1.example.com:7050 | 1492596646623301
-void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+void PrettyPrintTable(const vector<string>& headers,
+                      const vector<vector<string>>& columns,
+                      ostream& out) {
   CHECK_EQ(headers.size(), columns.size());
   if (headers.empty()) return;
   size_t num_columns = headers.size();
@@ -398,32 +401,32 @@ void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>
   // Print the header row.
   for (int col = 0; col < num_columns; col++) {
     int padding = widths[col] - headers[col].size();
-    cout << setw(padding / 2) << "" << " " << headers[col];
-    if (col != num_columns - 1) cout << setw((padding + 1) / 2) << "" << " |";
+    out << setw(padding / 2) << "" << " " << headers[col];
+    if (col != num_columns - 1) out << setw((padding + 1) / 2) << "" << " |";
   }
-  cout << endl;
+  out << endl;
 
   // Print the separator row.
-  cout << setfill('-');
+  out << setfill('-');
   for (int col = 0; col < num_columns; col++) {
-    cout << setw(widths[col] + 2) << "";
-    if (col != num_columns - 1) cout << "+";
+    out << setw(widths[col] + 2) << "";
+    if (col != num_columns - 1) out << "+";
   }
-  cout << endl;
+  out << endl;
 
   // Print the data rows.
-  cout << setfill(' ');
+  out << setfill(' ');
   int num_rows = columns.empty() ? 0 : columns[0].size();
   for (int row = 0; row < num_rows; row++) {
     for (int col = 0; col < num_columns; col++) {
       const auto& value = columns[col][row];
-      cout << " " << value;
+      out << " " << value;
       if (col != num_columns - 1) {
         size_t padding = widths[col] - value.size();
-        cout << setw(padding) << "" << " |";
+        out << setw(padding) << "" << " |";
       }
     }
-    cout << endl;
+    out << endl;
   }
 }
 
@@ -431,7 +434,9 @@ void PrettyPrintTable(const vector<string>& headers, const vector<vector<string>
 //
 // The table is formatted as an array of objects. Each object corresponds
 // to a row whose fields are the column values.
-void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+void JsonPrintTable(const vector<string>& headers,
+                    const vector<vector<string>>& columns,
+                    ostream& out) {
   std::ostringstream stream;
   JsonWriter writer(&stream, JsonWriter::COMPACT);
 
@@ -449,7 +454,7 @@ void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>&
   }
   writer.EndArray();
 
-  cout << stream.str() << endl;
+  out << stream.str() << endl;
 }
 
 // Print the table using the provided separator. For example, with a comma
@@ -460,32 +465,34 @@ void JsonPrintTable(const vector<string>& headers, const vector<vector<string>>&
 // dd23284d3a334f1a8306c19d89c1161f,130.rack1.dc1.example.com:7050,1492596704536543
 // d8009e07d82b4e66a7ab50f85e60bc30,136.rack1.dc1.example.com:7050,1492596696557549
 // c108a85a68504c2bb9f49e4ee683d981,128.rack1.dc1.example.com:7050,1492596646623301
-void PrintTable(const vector<vector<string>>& columns, const string& separator) {
+void PrintTable(const vector<vector<string>>& columns, const string& separator, ostream& out) {
   // TODO(dan): proper escaping of string values.
   int num_columns = columns.size();
   int num_rows = columns.empty() ? 0 : columns[0].size();
   for (int row = 0; row < num_rows; row++) {
       for (int col = 0; col < num_columns; col++) {
-        cout << columns[col][row];
-        if (col != num_columns - 1) cout << separator;
+        out << columns[col][row];
+        if (col != num_columns - 1) out << separator;
       }
-      cout << endl;
+      out << endl;
   }
 }
 
 } // anonymous namespace
 
-Status PrintTable(const vector<string>& headers, const vector<vector<string>>& columns) {
+Status PrintTable(const vector<string>& headers,
+                  const vector<vector<string>>& columns,
+                  ostream& out) {
   if (boost::iequals(FLAGS_format, "pretty")) {
-    PrettyPrintTable(headers, columns);
+    PrettyPrintTable(headers, columns, out);
   } else if (boost::iequals(FLAGS_format, "space")) {
-    PrintTable(columns, " ");
+    PrintTable(columns, " ", out);
   } else if (boost::iequals(FLAGS_format, "tsv")) {
-    PrintTable(columns, "	");
+    PrintTable(columns, "	", out);
   } else if (boost::iequals(FLAGS_format, "csv")) {
-    PrintTable(columns, ",");
+    PrintTable(columns, ",", out);
   } else if (boost::iequals(FLAGS_format, "json")) {
-    JsonPrintTable(headers, columns);
+    JsonPrintTable(headers, columns, out);
   } else {
     return Status::InvalidArgument("unknown format (--format)", FLAGS_format);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index a0e39ef..439ba4f 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <memory>
+#include <ostream>
 #include <string>
 #include <vector>
 
@@ -107,7 +108,8 @@ Status SetServerFlag(const std::string& address, uint16_t default_port,
 
 // Prints a table.
 Status PrintTable(const std::vector<std::string>& headers,
-                  const std::vector<std::vector<std::string>>& columns);
+                  const std::vector<std::vector<std::string>>& columns,
+                  std::ostream& out);
 
 // Wrapper around a Kudu client which allows calling proxy methods on the leader
 // master.

http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 1dcce1d..b6d7704 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/tools/tool_action.h"
 
+#include <iostream>
 #include <memory>
 #include <string>
 #include <utility>
@@ -42,6 +43,7 @@ namespace kudu {
 using master::ListMastersRequestPB;
 using master::ListMastersResponsePB;
 using master::MasterServiceProxy;
+using std::cout;
 using std::string;
 using std::unique_ptr;
 
@@ -138,7 +140,7 @@ Status ListMasters(const RunnerContext& context) {
     columns.emplace_back(std::move(values));
   }
 
-  RETURN_NOT_OK(PrintTable(headers, columns));
+  RETURN_NOT_OK(PrintTable(headers, columns, cout));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/30682fd1/src/kudu/tools/tool_action_tserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_tserver.cc b/src/kudu/tools/tool_action_tserver.cc
index 0352ae8..4a5efe0 100644
--- a/src/kudu/tools/tool_action_tserver.cc
+++ b/src/kudu/tools/tool_action_tserver.cc
@@ -18,6 +18,7 @@
 #include "kudu/tools/tool_action.h"
 
 #include <functional>
+#include <iostream>
 #include <memory>
 #include <string>
 #include <utility>
@@ -38,6 +39,7 @@
 
 DECLARE_string(columns);
 
+using std::cout;
 using std::string;
 using std::unique_ptr;
 
@@ -136,7 +138,7 @@ Status ListTServers(const RunnerContext& context) {
     columns.emplace_back(std::move(values));
   }
 
-  RETURN_NOT_OK(PrintTable(headers, columns));
+  RETURN_NOT_OK(PrintTable(headers, columns, cout));
   return Status::OK();
 }
 


[3/3] kudu git commit: consensus: Add DCHECK(is_locked()) in all unlocked methods

Posted by mp...@apache.org.
consensus: Add DCHECK(is_locked()) in all unlocked methods

Now that RaftConsensus owns 'lock_' we can easily implement these kinds
of basic hygenic checks.

Change-Id: I3364607e6778e6495e40d7ad36ea1850b022c444
Reviewed-on: http://gerrit.cloudera.org:8080/7013
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fbaa7dce
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fbaa7dce
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fbaa7dce

Branch: refs/heads/master
Commit: fbaa7dce968e2a6965effaa9d1097c34649d65f4
Parents: 3846861
Author: Mike Percy <mp...@apache.org>
Authored: Sat May 27 15:01:08 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 1 20:45:17 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc | 42 +++++++++++++++++++++++++++++++
 src/kudu/consensus/raft_consensus.h  |  4 +--
 2 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fbaa7dce/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 6a2c39f..b7f5cdc 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -533,6 +533,8 @@ void RaftConsensus::ReportFailureDetected(const std::string& name, const Status&
 }
 
 Status RaftConsensus::BecomeLeaderUnlocked() {
+  DCHECK(lock_.is_locked());
+
   TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
                "peer", peer_uuid_,
                "tablet", options_.tablet_id);
@@ -566,6 +568,8 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
 }
 
 Status RaftConsensus::BecomeReplicaUnlocked() {
+  DCHECK(lock_.is_locked());
+
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
                                  << ToStringUnlocked();
 
@@ -610,6 +614,8 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
 }
 
 Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) {
+  DCHECK(lock_.is_locked());
+
   *round->replicate_msg()->mutable_id() = queue_->GetNextOpId();
   RETURN_NOT_OK(AddPendingOperationUnlocked(round));
 
@@ -621,6 +627,8 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
 }
 
 Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
+  DCHECK(lock_.is_locked());
+
   // If we are adding a pending config change, we need to propagate it to the
   // metadata.
   if (PREDICT_FALSE(round->replicate_msg()->op_type() == CHANGE_CONFIG_OP)) {
@@ -794,6 +802,8 @@ static bool IsConsensusOnlyOperation(OperationType op_type) {
 }
 
 Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg) {
+  DCHECK(lock_.is_locked());
+
   if (IsConsensusOnlyOperation(msg->get()->op_type())) {
     return StartConsensusOnlyRoundUnlocked(msg);
   }
@@ -841,6 +851,8 @@ std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
 
 void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req,
                                                      LeaderRequest* deduplicated_req) {
+  DCHECK(lock_.is_locked());
+
   // TODO(todd): use queue committed index?
   int64_t last_committed_index = pending_.GetCommittedIndex();
 
@@ -903,6 +915,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
 
 Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
                                                       ConsensusResponsePB* response) {
+  DCHECK(lock_.is_locked());
   // Do term checks first:
   if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
 
@@ -928,6 +941,7 @@ Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB*
 
 Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
                                                                 ConsensusResponsePB* response) {
+  DCHECK(lock_.is_locked());
 
   bool term_mismatch;
   if (pending_.IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
@@ -967,6 +981,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
 }
 
 void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
+  DCHECK(lock_.is_locked());
   pending_.AbortOpsAfter(truncate_after_index);
   queue_->TruncateOpsAfter(truncate_after_index);
 }
@@ -974,6 +989,7 @@ void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_inde
 Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
                                                  ConsensusResponsePB* response,
                                                  LeaderRequest* deduped_req) {
+  DCHECK(lock_.is_locked());
 
   if (request->has_deprecated_committed_index() ||
       !request->has_all_replicated_index()) {
@@ -1373,6 +1389,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 }
 
 void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
+  DCHECK(lock_.is_locked());
   TRACE("Filling consensus response to leader.");
   response->set_responder_term(GetCurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
@@ -1796,6 +1813,7 @@ OpId RaftConsensus::GetLatestOpIdFromLog() {
 }
 
 Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
+  DCHECK(lock_.is_locked());
   OperationType op_type = msg->get()->op_type();
   CHECK(IsConsensusOnlyOperation(op_type))
       << "Expected a consensus-only op type, got " << OperationType_Name(op_type)
@@ -1821,6 +1839,7 @@ Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
 }
 
 std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
+  DCHECK(lock_.is_locked());
   return Substitute("$0Leader $1election vote request",
                     LogPrefixUnlocked(),
                     request.is_pre_election() ? "pre-" : "");
@@ -1981,6 +2000,7 @@ void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
 Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
                                                     const RaftConfigPB& new_config,
                                                     const StatusCallback& client_cb) {
+  DCHECK(lock_.is_locked());
   auto cc_replicate = new ReplicateMsg();
   cc_replicate->set_op_type(CHANGE_CONFIG_OP);
   ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
@@ -2001,6 +2021,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
 }
 
 Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
+  DCHECK(lock_.is_locked());
   DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
   const RaftConfigPB& active_config = GetActiveConfigUnlocked();
 
@@ -2264,6 +2285,7 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
 }
 
 void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, const Status& status) {
+  DCHECK(lock_.is_locked());
   const OpId& op_id = round->replicate_msg()->id();
 
   if (!status.ok()) {
@@ -2322,6 +2344,7 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
 
 
 Status RaftConsensus::EnsureFailureDetectorEnabledUnlocked() {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
     return Status::OK();
   }
@@ -2335,6 +2358,7 @@ Status RaftConsensus::EnsureFailureDetectorEnabledUnlocked() {
 }
 
 Status RaftConsensus::EnsureFailureDetectorDisabledUnlocked() {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
     return Status::OK();
   }
@@ -2346,6 +2370,7 @@ Status RaftConsensus::EnsureFailureDetectorDisabledUnlocked() {
 }
 
 Status RaftConsensus::ExpireFailureDetectorUnlocked() {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
     return Status::OK();
   }
@@ -2354,11 +2379,13 @@ Status RaftConsensus::ExpireFailureDetectorUnlocked() {
 }
 
 Status RaftConsensus::SnoozeFailureDetectorUnlocked() {
+  DCHECK(lock_.is_locked());
   return SnoozeFailureDetectorUnlocked(MonoDelta::FromMicroseconds(0), DO_NOT_LOG);
 }
 
 Status RaftConsensus::SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
                                                     AllowLogging allow_logging) {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(!FLAGS_enable_leader_failure_detection)) {
     return Status::OK();
   }
@@ -2380,6 +2407,7 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const {
 }
 
 MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
+  DCHECK(lock_.is_locked());
   // Compute a backoff factor based on how many leader elections have
   // failed since a stable leader was last seen.
   double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1);
@@ -2400,6 +2428,7 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
 
 Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
                                                 FlushToDisk flush) {
+  DCHECK(lock_.is_locked());
   if (new_term <= GetCurrentTermUnlocked()) {
     return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
                                            new_term, GetCurrentTermUnlocked()));
@@ -2434,6 +2463,7 @@ Status RaftConsensus::CheckRunningUnlocked() const {
 }
 
 Status RaftConsensus::CheckActiveLeaderUnlocked() const {
+  DCHECK(lock_.is_locked());
   RaftPeerPB::Role role = GetActiveRoleUnlocked();
   switch (role) {
     case RaftPeerPB::LEADER:
@@ -2449,6 +2479,7 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
 }
 
 ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const {
+  DCHECK(lock_.is_locked());
   return cmeta_->ToConsensusStatePB();
 }
 
@@ -2494,6 +2525,7 @@ Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
 }
 
 void RaftConsensus::ClearPendingConfigUnlocked() {
+  DCHECK(lock_.is_locked());
   cmeta_->clear_pending_config();
 }
 
@@ -2573,6 +2605,16 @@ const string& RaftConsensus::GetLeaderUuidUnlocked() const {
   return cmeta_->leader_uuid();
 }
 
+bool RaftConsensus::HasLeaderUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return !GetLeaderUuidUnlocked().empty();
+}
+
+void RaftConsensus::ClearLeaderUnlocked() {
+  DCHECK(lock_.is_locked());
+  SetLeaderUuidUnlocked("");
+}
+
 const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   return cmeta_->has_voted_for();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fbaa7dce/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b9348c2..f00a8c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -528,8 +528,8 @@ class RaftConsensus : public Consensus,
 
   // Accessors for the leader of the current term.
   const std::string& GetLeaderUuidUnlocked() const;
-  bool HasLeaderUnlocked() const { return !GetLeaderUuidUnlocked().empty(); }
-  void ClearLeaderUnlocked() { SetLeaderUuidUnlocked(""); }
+  bool HasLeaderUnlocked() const;
+  void ClearLeaderUnlocked();
 
   // Return whether this peer has voted in the current term.
   const bool HasVotedCurrentTermUnlocked() const;


[2/3] kudu git commit: consensus: Get rid of LockFor*() methods

Posted by mp...@apache.org.
consensus: Get rid of LockFor*() methods

Simplify the locking logic by removing layers of abstraction.

Also add State_Name() helper for state-related error messages.

Change-Id: I6858752f4fbeb70b09eb4375c52e4aeaa1bb8e71
Reviewed-on: http://gerrit.cloudera.org:8080/7012
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3846861a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3846861a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3846861a

Branch: refs/heads/master
Commit: 3846861ab258a0ac0497893865875b2138964fe3
Parents: 30682fd
Author: Mike Percy <mp...@apache.org>
Authored: Tue May 30 14:00:53 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 1 20:44:45 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            | 298 +++++++++----------
 src/kudu/consensus/raft_consensus.h             |  68 ++---
 .../consensus/raft_consensus_quorum-test.cc     |   9 +-
 3 files changed, 169 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c5370c7..6a2c39f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -278,8 +278,11 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
                                                         failure_detector_));
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForStart(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
+                                   << State_Name(state_);
+
     ClearLeaderUnlocked();
 
     // Our last persisted term can be higher than the last persisted operation
@@ -310,8 +313,9 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
   }
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
 
     RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
 
@@ -350,15 +354,19 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
 }
 
 bool RaftConsensus::IsRunning() const {
-  UniqueLock lock;
-  Status s = LockForRead(&lock);
-  if (PREDICT_FALSE(!s.ok())) return false;
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return state_ == kRunning;
 }
 
 Status RaftConsensus::EmulateElection() {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForConfigChange(&lock));
+  TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
 
@@ -404,8 +412,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
                "mode", mode_str);
   scoped_refptr<LeaderElection> election;
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
 
     RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
     if (active_role == RaftPeerPB::LEADER) {
@@ -413,7 +422,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       return Status::OK();
     }
     if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
-      SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
+      RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this state.
       return Status::IllegalState("Not starting election: Node is currently "
                                   "a non-participant in the raft config",
                                   SecureShortDebugString(GetActiveConfigUnlocked()));
@@ -499,8 +508,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
 
 Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForConfigChange(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
   if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
     resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
     StatusToPB(Status::IllegalState("Not currently leader"),
@@ -580,8 +590,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
 
   std::lock_guard<simple_spinlock> lock(update_lock_);
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
     RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
     RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
   }
@@ -591,8 +602,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
 }
 
 Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
   round->BindToTerm(GetCurrentTermUnlocked());
   return Status::OK();
 }
@@ -655,12 +667,19 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
 }
 
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
-  UniqueLock lock;
-  Status s = LockForCommit(&lock);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << LogPrefixThreadSafe()
-        << "Unable to take state lock to update committed index: "
-        << s.ToString();
+  TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex",
+               "tablet", options_.tablet_id,
+               "commit_index", commit_index);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  // We will process commit notifications while shutting down because a replica
+  // which has initiated a Prepare() / Replicate() may eventually commit even if
+  // its state has changed after the initial Append() / Update().
+  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
+                                      << "Replica not in running state: "
+                                      << State_Name(state_);
     return;
   }
 
@@ -672,11 +691,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
 }
 
 void RaftConsensus::NotifyTermChange(int64_t term) {
-  UniqueLock lock;
-  Status s = LockForConfigChange(&lock);
+  TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange",
+               "tablet", options_.tablet_id,
+               "term", term);
+
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  Status s = CheckRunningUnlocked();
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change"
-                 << " when notified of new term " << term << ": " << s.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term "
+                                      << "(" << term << "): " << s.ToString();
     return;
   }
   WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term.");
@@ -697,14 +721,8 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
 
   RaftConfigPB committed_config;
   {
-    UniqueLock lock;
-    Status s = LockForRead(&lock);
-    if (PREDICT_FALSE(!s.ok())) {
-      LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
-                   << "Unable to lock consensus for read: " << s.ToString();
-      return;
-    }
-
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     int64_t current_term = GetCurrentTermUnlocked();
     if (current_term != term) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
@@ -794,8 +812,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
 }
 
 Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   const RaftConfigPB& config = GetCommittedConfigUnlocked();
   const string& uuid = peer_uuid_;
   if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
@@ -1142,8 +1160,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
   LeaderRequest deduped_req;
 
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForUpdate(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
+    if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
+    }
 
     deduped_req.leader_uuid = request->caller_uuid();
 
@@ -1333,9 +1355,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // If just waiting for our log append to finish lets snooze the timer.
       // We don't want to fire leader election because we're waiting on our own log.
       if (s.IsTimedOut()) {
-        UniqueLock lock;
-        RETURN_NOT_OK(LockForRead(&lock));
-        SnoozeFailureDetectorUnlocked();
+        ThreadRestrictions::AssertWaitAllowed();
+        LockGuard l(lock_);
+        RETURN_NOT_OK(SnoozeFailureDetectorUnlocked());
       }
     } while (s.IsTimedOut());
     RETURN_NOT_OK(s);
@@ -1393,14 +1415,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
     // timeouts, just vote a quick NO.
     //
     // We still need to take the state lock in order to respond with term info, etc.
-    UniqueLock state_guard;
-    RETURN_NOT_OK(LockForConfigChange(&state_guard));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
     return RequestVoteRespondIsBusy(request, response);
   }
 
   // Acquire the replica state lock so we can read / modify the consensus state.
-  UniqueLock state_guard;
-  RETURN_NOT_OK(LockForConfigChange(&state_guard));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  RETURN_NOT_OK(CheckRunningUnlocked());
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
@@ -1480,6 +1504,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                                    const StatusCallback& client_cb,
                                    boost::optional<TabletServerErrorPB::Code>* error_code) {
+  TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
   if (PREDICT_FALSE(!req.has_type())) {
     return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()",
                                    SecureShortDebugString(req));
@@ -1492,8 +1520,9 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
   ChangeConfigType type = req.type();
   const RaftPeerPB& server = req.server();
   {
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForConfigChange(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
+    RETURN_NOT_OK(CheckRunningUnlocked());
     RETURN_NOT_OK(CheckActiveLeaderUnlocked());
     RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
 
@@ -1599,8 +1628,8 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   {
     // Take the snapshot of the replica state and queue state so that
     // we can stick them in the consensus update request later.
-    UniqueLock lock;
-    RETURN_NOT_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     current_term = GetCurrentTermUnlocked();
     committed_config = GetCommittedConfigUnlocked();
     if (IsConfigChangePendingUnlocked()) {
@@ -1719,6 +1748,10 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
 }
 
 void RaftConsensus::Shutdown() {
+  TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+
   // Avoid taking locks if already shut down so we don't violate
   // ThreadRestrictions assertions in the case where the RaftConsensus
   // destructor runs on the reactor thread due to an election callback being
@@ -1726,9 +1759,11 @@ void RaftConsensus::Shutdown() {
   if (shutdown_.Load(kMemOrderAcquire)) return;
 
   {
-    UniqueLock lock;
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     // Transition to kShuttingDown state.
-    CHECK_OK(LockForShutdown(&lock));
+    CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
+    state_ = kShuttingDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
   }
 
@@ -1739,10 +1774,10 @@ void RaftConsensus::Shutdown() {
   queue_->Close();
 
   {
-    UniqueLock lock;
-    CHECK_OK(LockForShutdown(&lock));
-    CHECK_EQ(kShuttingDown, state_);
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     CHECK_OK(pending_.CancelPendingTransactions());
+    CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
     state_ = kShutDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
@@ -1779,8 +1814,9 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
 }
 
 Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
-  UniqueLock lock;
-  CHECK_OK(LockForConfigChange(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  CHECK_OK(CheckRunningUnlocked());
   return HandleTermAdvanceUnlocked(new_term);
 }
 
@@ -1914,11 +1950,27 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
 }
 
 RaftPeerPB::Role RaftConsensus::role() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return GetActiveRoleUnlocked();
 }
 
+const char* RaftConsensus::State_Name(State state) {
+  switch (state) {
+    case kInitialized:
+      return "Initialized";
+    case kRunning:
+      return "Running";
+    case kShuttingDown:
+      return "Shutting down";
+    case kShutDown:
+      return "Shut down";
+    default:
+      LOG(DFATAL) << "Unknown State value: " << state;
+      return "Unknown";
+  }
+}
+
 void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
   DCHECK(lock_.is_locked());
   failed_elections_since_stable_leader_ = 0;
@@ -1975,14 +2027,14 @@ const string& RaftConsensus::tablet_id() const {
 }
 
 ConsensusStatePB RaftConsensus::ConsensusState() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return ConsensusStateUnlocked();
 }
 
 RaftConfigPB RaftConsensus::CommittedConfig() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return GetCommittedConfigUnlocked();
 }
 
@@ -1997,8 +2049,8 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   // Dump the queues on a leader.
   RaftPeerPB::Role role;
   {
-    UniqueLock lock;
-    CHECK_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     role = GetActiveRoleUnlocked();
   }
   if (role == RaftPeerPB::LEADER) {
@@ -2026,8 +2078,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
 
   // Snooze to avoid the election timer firing again as much as possible.
   {
-    UniqueLock lock;
-    CHECK_OK(LockForRead(&lock));
+    ThreadRestrictions::AssertWaitAllowed();
+    LockGuard l(lock_);
     // We need to snooze when we win and when we lose:
     // - When we win because we're about to disable the timer and become leader.
     // - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2064,12 +2116,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
   }
 
   // The vote was granted, become leader.
-  UniqueLock lock;
-  Status s = LockForConfigChange(&lock);
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock lock(lock_);
+  Status s = CheckRunningUnlocked();
   if (PREDICT_FALSE(!s.ok())) {
-    LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term "
-                          << election_term << " while not running: "
-                          << s.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term "
+                                   << election_term << " while not running: "
+                                   << s.ToString();
     return;
   }
 
@@ -2137,8 +2190,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
 }
 
 Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
-  UniqueLock lock;
-  RETURN_NOT_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
@@ -2192,13 +2245,13 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   // the client callback.
 
   if (!status.ok()) {
-    LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: "
-              << status.ToString();
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: "
+                                   << status.ToString();
     client_cb.Run(status);
     return;
   }
-  VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
-          << round->id();
+  VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id "
+                               << round->id();
   gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
   commit_msg->set_op_type(round->replicate_msg()->op_type());
   *commit_msg->mutable_commited_op_id() = round->id();
@@ -2364,82 +2417,19 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
   return Status::OK();
 }
 
-Status RaftConsensus::LockForStart(UniqueLock* lock) const {
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
-      << " Replica is not in kInitialized state";
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForRead(UniqueLock* lock) const {
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
-  ThreadRestrictions::AssertWaitAllowed();
+Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const {
+  DCHECK(lock_.is_locked());
   DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-
-  RETURN_NOT_OK(CheckActiveLeaderUnlocked());
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-  if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
-  }
-  lock->swap(l);
-  return Status::OK();
+  RETURN_NOT_OK(CheckRunningUnlocked());
+  return CheckActiveLeaderUnlocked();
 }
 
-Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
-
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  // Can only change the config on running replicas.
+Status RaftConsensus::CheckRunningUnlocked() const {
+  DCHECK(lock_.is_locked());
   if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Unable to lock ReplicaState for config change",
-                                Substitute("State = $0", state_));
+    return Status::IllegalState("RaftConsensus is not running",
+                                Substitute("State = $0", State_Name(state_)));
   }
-  lock->swap(l);
-  return Status::OK();
-}
-
-Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
-  TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(lock_);
-  if (state_ != kShuttingDown && state_ != kShutDown) {
-    state_ = kShuttingDown;
-  }
-  lock->swap(l);
   return Status::OK();
 }
 
@@ -2608,8 +2598,8 @@ const ConsensusOptions& RaftConsensus::GetOptions() const {
 }
 
 string RaftConsensus::LogPrefix() const {
-  UniqueLock lock;
-  CHECK_OK(LockForRead(&lock));
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
   return LogPrefixUnlocked();
 }
 
@@ -2630,14 +2620,14 @@ string RaftConsensus::LogPrefixThreadSafe() const {
 
 string RaftConsensus::ToString() const {
   ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock lock(lock_);
+  LockGuard l(lock_);
   return ToStringUnlocked();
 }
 
 string RaftConsensus::ToStringUnlocked() const {
   DCHECK(lock_.is_locked());
   return Substitute("Replica: $0, State: $1, Role: $2",
-                    peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+                    peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
 }
 
 ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b1badde..b9348c2 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -35,9 +35,6 @@
 
 namespace kudu {
 
-typedef std::lock_guard<simple_spinlock> Lock;
-typedef gscoped_ptr<Lock> ScopedLock;
-
 class Counter;
 class FailureDetector;
 class HostPort;
@@ -62,8 +59,6 @@ struct ElectionResult;
 class RaftConsensus : public Consensus,
                       public PeerMessageQueueObserver {
  public:
-  typedef std::unique_lock<simple_spinlock> UniqueLock;
-
   static scoped_refptr<RaftConsensus> Create(
     ConsensusOptions options,
     std::unique_ptr<ConsensusMetadata> cmeta,
@@ -181,6 +176,8 @@ class RaftConsensus : public Consensus,
   FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
   FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
 
+  // NOTE: When adding / changing values in this enum, add the corresponding
+  // values to State_Name().
   enum State {
     // State after the replica is built.
     kInitialized,
@@ -224,6 +221,12 @@ class RaftConsensus : public Consensus,
     std::string OpsRangeString() const;
   };
 
+  using LockGuard = std::lock_guard<simple_spinlock>;
+  using UniqueLock = std::unique_lock<simple_spinlock>;
+
+  // Returns string description for State enum value.
+  static const char* State_Name(State state);
+
   // Set the leader UUID of the configuration and mark the tablet config dirty for
   // reporting to the master.
   void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -276,7 +279,8 @@ class RaftConsensus : public Consensus,
   // pending operations, we proactively abort those pending operations after and including
   // the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache.
   Status EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req,
-                                                   ConsensusResponsePB* response);
+                                                   ConsensusResponsePB* response)
+         WARN_UNUSED_RESULT;
 
   // Check a request received from a leader, making sure:
   // - The request is in the right term
@@ -288,7 +292,7 @@ class RaftConsensus : public Consensus,
   // the messages to add to our state machine.
   Status CheckLeaderRequestUnlocked(const ConsensusRequestPB* request,
                                     ConsensusResponsePB* response,
-                                    LeaderRequest* deduped_req);
+                                    LeaderRequest* deduped_req) WARN_UNUSED_RESULT;
 
   // Abort any pending operations after the given op index,
   // and also truncate the LogCache accordingly.
@@ -382,13 +386,13 @@ class RaftConsensus : public Consensus,
   // When this is called a failure is guaranteed not to be detected
   // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' *
   // 'FLAGS_raft_heartbeat_interval_ms' has elapsed.
-  Status SnoozeFailureDetectorUnlocked();
+  Status SnoozeFailureDetectorUnlocked() WARN_UNUSED_RESULT;
 
   // Like the above but adds 'additional_delta' to the default timeout
   // period. If allow_logging is set to ALLOW_LOGGING, then this method
   // will print a log message when called.
   Status SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta,
-                                       AllowLogging allow_logging);
+                                       AllowLogging allow_logging) WARN_UNUSED_RESULT;
 
   // Return the minimum election timeout. Due to backoff and random
   // jitter, election timeouts may be longer than this.
@@ -462,45 +466,17 @@ class RaftConsensus : public Consensus,
   // (see Diego Ongaro's thesis section 4.1).
   Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
 
-  // Locks a replica in preparation for StartUnlocked(). Makes
-  // sure the replica is in kInitialized state.
-  Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Obtains the lock for a state read, does not check state.
-  Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until the critical section of an append completes,
-  // i.e. until the replicate message has been assigned an id and placed in
-  // the log queue.
-  // This also checks that the replica is in the appropriate
-  // state (role) to replicate the provided operation, that the operation
-  // contains a replicate message and is of the appropriate type, and returns
-  // Status::IllegalState if that is not the case.
-  Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until the critical section of a commit completes.
-  // This succeeds for all states since a replica which has initiated
-  // a Prepare()/Replicate() must eventually commit even if it's state
-  // has changed after the initial Append()/Update().
-  Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Locks a replica down until an the critical section of an update completes.
-  // Further updates from the same or some other leader will be blocked until
-  // this completes. This also checks that the replica is in the appropriate
-  // state (role) to be updated and returns Status::IllegalState if that
-  // is not the case.
-  Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
-
-  // Changes the role to non-participant and returns a lock that can be
-  // used to make sure no state updates come in until Shutdown() is
-  // completed.
-  Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+  // Checks that the replica is in the appropriate state and role to replicate
+  // the provided operation and that the replicate message does not yet have an
+  // OpId assigned.
+  Status CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+  // Return Status::IllegalState if 'state_' != kRunning, OK otherwise.
+  Status CheckRunningUnlocked() const WARN_UNUSED_RESULT;
 
   // Ensure the local peer is the active leader.
   // Returns OK if leader, IllegalState otherwise.
-  Status CheckActiveLeaderUnlocked() const;
+  Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
 
   // Return current consensus state summary.
   ConsensusStatePB ConsensusStateUnlocked() const;
@@ -515,7 +491,7 @@ class RaftConsensus : public Consensus,
   // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
   // currently *no* configuration change pending, and IllegalState is there *is* a
   // configuration change pending.
-  Status CheckNoConfigChangePendingUnlocked() const;
+  Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT;
 
   // Sets the given configuration as pending commit. Does not persist into the peers
   // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.

http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index bb1575e..a0933c8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -660,8 +660,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
 
-    RaftConsensus::UniqueLock lock;
-    ASSERT_OK(follower0->LockForRead(&lock));
+    RaftConsensus::LockGuard l(follower0->lock_);
 
     // If the locked replica would stop consensus we would hang here
     // as we wait for operations to be replicated to a majority.
@@ -703,13 +702,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
     // and never letting them go.
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
-    RaftConsensus::UniqueLock lock0;
-    ASSERT_OK(follower0->LockForRead(&lock0));
+    RaftConsensus::LockGuard l_0(follower0->lock_);
 
     scoped_refptr<RaftConsensus> follower1;
     CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
-    RaftConsensus::UniqueLock lock1;
-    ASSERT_OK(follower1->LockForRead(&lock1));
+    RaftConsensus::LockGuard l_1(follower1->lock_);
 
     // Append a single message to the queue
     ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));