You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2015/07/22 00:23:21 UTC
trafficserver git commit: clang-format and move copyrights to NOTICES
for RAFT code.
Repository: trafficserver
Updated Branches:
refs/heads/master 38e1d741f -> db4029a2f
clang-format and move copyrights to NOTICES for RAFT code.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/db4029a2
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/db4029a2
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/db4029a2
Branch: refs/heads/master
Commit: db4029a2f8317d32770586ab87d19580450d11c7
Parents: 38e1d74
Author: John Plevyak <jp...@google.com>
Authored: Tue Jul 21 15:21:57 2015 -0700
Committer: John Plevyak <jp...@google.com>
Committed: Tue Jul 21 15:21:57 2015 -0700
----------------------------------------------------------------------
NOTICE | 5 +
lib/raft/raft.h | 89 ++--
lib/raft/raft.proto | 35 +-
lib/raft/raft_impl.h | 989 ++++++++++++++++++++++++---------------------
lib/raft/raft_test.cc | 337 ++++++++-------
5 files changed, 815 insertions(+), 640 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/db4029a2/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index 6132ce6..453af9b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -91,3 +91,8 @@ lib/ts/HashSip.cc contains code derived from code at https://github.com/floodybe
The code is public domain.
The algorithm info can be found at: https://131002.net/siphash/
+
+~~
+
+lib/raft developed by Google.
+Copyright 2014 Google Inc. All Rights Reserved.
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/db4029a2/lib/raft/raft.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft.h b/lib/raft/raft.h
index f926a11..992f399 100644
--- a/lib/raft/raft.h
+++ b/lib/raft/raft.h
@@ -1,17 +1,25 @@
-// Copyright 2014 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/** @file
+
+ This is the primary include file for the proxy cache system.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
#ifndef RAFT_H_
#define RAFT_H_
// An implementation of the RAFT consensus algorithm:
@@ -73,57 +81,56 @@
#include <string>
#include <vector>
-namespace raft {
-template <typename Server>
-class Raft {
- public:
+namespace raft
+{
+template <typename Server> class Raft
+{
+public:
typedef typename Server::Message Message;
typedef typename Server::LogEntry LogEntry;
virtual ~Raft() {}
- virtual void SetElectionTimeout(double seconds) = 0; // 1 sec.
+ virtual void SetElectionTimeout(double seconds) = 0; // 1 sec.
- virtual void Recover(const LogEntry& entry) = 0;
+ virtual void Recover(const LogEntry &entry) = 0;
virtual void Start(double now, int64_t random_seed) = 0;
- virtual void Tick(double now) = 0; // Call every ~election_timeout/10.
- virtual void Propose(const LogEntry& entry) = 0;
- virtual void Run(double now, const Message& message) = 0;
- virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) = 0;
- virtual void Stop() = 0; // Clean shutdown for faster failover.
+ virtual void Tick(double now) = 0; // Call every ~election_timeout/10.
+ virtual void Propose(const LogEntry &entry) = 0;
+ virtual void Run(double now, const Message &message) = 0;
+ virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry> *entries) = 0;
+ virtual void Stop() = 0; // Clean shutdown for faster failover.
};
// The server argument is not owned by the Raft.
-template <typename Server>
-Raft<Server>* NewRaft(Server* server, const ::std::string &node);
+template <typename Server> Raft<Server> *NewRaft(Server *server, const ::std::string &node);
// The Server template argument of Raft must conform to this interface.
-class RaftServerInterface {
- public:
+class RaftServerInterface
+{
+public:
typedef Raft<RaftServerInterface> RaftClass;
- class Config; // See RaftConfigPb in raft.proto.
- class LogEntry; // See RaftLogEntryPb in raft.proto.
- class Message; // See RaftMessagePb in raft.proto.
+ class Config; // See RaftConfigPb in raft.proto.
+ class LogEntry; // See RaftLogEntryPb in raft.proto.
+ class Message; // See RaftMessagePb in raft.proto.
// Since a single server may handle multiple raft objects, the
// RaftClass argument is provided to differentiate the caller.
// Send a raft message to the given node.
// Returns: true if accepted for delivery.
- bool SendMessage(RaftClass* raft, const ::std::string& node,
- const Message& message);
+ bool SendMessage(RaftClass *raft, const ::std::string &node, const Message &message);
// Get a LogEntry to update a node from after (term, index) up to end. These
// could be the actual written log entry or for committed log entries one
// which summarizes the changes.
- void GetLogEntry(RaftClass* raft, int64_t term, int64_t index,
- int64_t end, LogEntry* entry);
+ void GetLogEntry(RaftClass *raft, int64_t term, int64_t index, int64_t end, LogEntry *entry);
// Write a log entry, returning when it has been persisted.
- void WriteLogEntry(RaftClass* raft, const LogEntry& entry);
+ void WriteLogEntry(RaftClass *raft, const LogEntry &entry);
// Commit a log entry, updating the server state.
- void CommitLogEntry(RaftClass* raft, const LogEntry& entry);
+ void CommitLogEntry(RaftClass *raft, const LogEntry &entry);
// The leader has changed. If leader.empty() there is no leader.
- void LeaderChange(RaftClass* raft, const ::std::string& leader);
+ void LeaderChange(RaftClass *raft, const ::std::string &leader);
// The configuration has changed.
- void ConfigChange(RaftClass* raft, const Config& config);
+ void ConfigChange(RaftClass *raft, const Config &config);
};
-} // namespace raft
-#endif // RAFT_H_
+} // namespace raft
+#endif // RAFT_H_
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/db4029a2/lib/raft/raft.proto
----------------------------------------------------------------------
diff --git a/lib/raft/raft.proto b/lib/raft/raft.proto
index 8b35043..bae3706 100644
--- a/lib/raft/raft.proto
+++ b/lib/raft/raft.proto
@@ -1,19 +1,26 @@
-// Copyright 2014 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/** @file
-syntax = "proto2";
+ A brief file description
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+syntax = "proto2";
package raft;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/db4029a2/lib/raft/raft_impl.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft_impl.h b/lib/raft/raft_impl.h
index 10248b3..56d5168 100644
--- a/lib/raft/raft_impl.h
+++ b/lib/raft/raft_impl.h
@@ -1,17 +1,25 @@
-// Copyright 2014 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/** @file
+ This is the primary include file for the proxy cache system.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
#ifndef CONSENSUS_IMPL_H_
#define CONSENSUS_IMPL_H_
#include <stdlib.h>
@@ -25,506 +33,587 @@
#include "raft.h"
-namespace raft {
-
-template <typename Server>
-class RaftImpl : public Raft<Server> {
- public:
- typedef typename Server::Message Message;
- typedef typename Server::LogEntry LogEntry;
- typedef typename Server::Config Config;
-
- RaftImpl(Server* server, const ::std::string &node)
- : server_(server), node_(node) {}
- ~RaftImpl() {}
-
- virtual void SetElectionTimeout(double timeout) {
- election_timeout_ = timeout;
- }
+ namespace raft
+{
+ template <typename Server> class RaftImpl : public Raft<Server>
+ {
+ public:
+ typedef typename Server::Message Message;
+ typedef typename Server::LogEntry LogEntry;
+ typedef typename Server::Config Config;
+
+ RaftImpl(Server *server, const ::std::string &node) : server_(server), node_(node) {}
+ ~RaftImpl() {}
+
+ virtual void
+ SetElectionTimeout(double timeout)
+ {
+ election_timeout_ = timeout;
+ }
- virtual void Recover(const LogEntry& e) {
- if (!e.has_term()) { // LogEntry from server.
- if (e.has_index()) { // Summary log entry.
+ virtual void
+ Recover(const LogEntry &e)
+ {
+ if (!e.has_term()) { // LogEntry from server.
+ if (e.has_index()) { // Summary log entry.
+ ProcessLogEntry(e, true);
+ Commit(true);
+ } else if (e.has_config()) {
+ config_.CopyFrom(e.config());
+ ConfigChanged();
+ }
+ } else { // LogEntry has passed through Raft.
+ if (e.term() > term_)
+ NewTerm(e.term(), e.leader(), true);
+ if (e.has_config_committed())
+ config_committed_ = e.config_committed();
+ if (e.has_data_committed())
+ data_committed_ = e.data_committed();
ProcessLogEntry(e, true);
Commit(true);
- } else if (e.has_config()) {
- config_.CopyFrom(e.config());
- ConfigChanged();
}
- } else { // LogEntry has passed through Raft.
- if (e.term() > term_) NewTerm(e.term(), e.leader(), true);
- if (e.has_config_committed()) config_committed_ = e.config_committed();
- if (e.has_data_committed()) data_committed_ = e.data_committed();
- ProcessLogEntry(e, true);
- Commit(true);
}
- }
- virtual void Start(double now, int64_t seed) {
- last_heartbeat_ = now;
- srand48_r(seed, &rand_);
- double r = 0.0;
- drand48_r(&rand_, &r);
- random_election_delay_ = election_timeout_ * r;
- if (ConfigChanged())
- NewTerm(term_ + 1, leader_, true);
- else
- vote_ = node_; // Conservatively assume we called a vote for ourself.
- server_->ConfigChange(this, config_);
- server_->LeaderChange(this, leader_);
- }
-
- virtual void Tick(double now) {
- if (i_am_in_nodes() && !other_nodes_.empty() &&
- now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
+ virtual void
+ Start(double now, int64_t seed)
+ {
+ last_heartbeat_ = now;
+ srand48_r(seed, &rand_);
double r = 0.0;
drand48_r(&rand_, &r);
random_election_delay_ = election_timeout_ * r;
- last_heartbeat_ = now;
- VoteForMe();
- return;
- }
- // Send heartbeats at 1/4 of timeout to allow for lost packets/connections.
- if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
- last_heartbeat_sent_ = now;
- ReplicateAll(true);
- }
- }
-
- virtual void Propose(const LogEntry& e) {
- assert(i_am_leader());
- LogEntry entry(e);
- entry.set_term(term_);
- entry.set_index(index_ + 1);
- entry.set_previous_log_term(last_log_term_);
- entry.set_previous_log_index(index_);
- ProcessLogEntry(entry, false);
- ReplicateAll(false);
- Commit(false);
- }
-
- virtual void Run(double now, const Message& m) {
- if (m.term() >= term_) seen_term_ = true;
- if (m.term() < term_) return; // Ignore messages from terms gone by.
- if (m.term() > term_) NewTerm(m.term(), m.leader(), false);
- if (m.leader() != "" && leader_ != m.leader() &&
- other_nodes_.count(m.from())) { // Only from nodes I acknowledge.
- leader_ = m.leader();
+ if (ConfigChanged())
+ NewTerm(term_ + 1, leader_, true);
+ else
+ vote_ = node_; // Conservatively assume we called a vote for ourself.
+ server_->ConfigChange(this, config_);
server_->LeaderChange(this, leader_);
}
- auto& n = node_state_[m.from()];
- if (n.term != m.term()) {
- n.term = m.term();
- n.vote = "";
- }
- n.term = term_;
- n.last_log_term = m.last_log_term();
- n.last_log_index = m.last_log_index();
- if (m.from() != leader_ || m.has_vote()) {
- HandleAck(now, m, &n);
- if (m.has_vote()) HandleVote(m, &n);
- return;
- }
- last_heartbeat_ = now;
- if (m.config_committed() > config_committed_ ||
- m.data_committed() > data_committed_) {
- config_committed_ = m.config_committed();
- data_committed_ = m.data_committed();
- WriteInternalLogEntry();
- }
- if (m.has_entry())
- Ack(ProcessLogEntry(m.entry(), false));
- else
- Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
- Commit(false);
- }
- virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) {
- entries->clear();
- LogEntry config_e;
- config_e.set_term(config_.term());
- config_e.set_index(config_.index());
- config_e.set_vote(vote_);
- config_e.set_data_committed(data_committed_);
- config_e.set_config_committed(config_committed_);
- config_e.mutable_config()->CopyFrom(config_);
- entries->push_back(config_e);
- if (pending_config_.has_term() &&
- (!waiting_commits_.size() || // If it isn't in the waiting_commits.
- waiting_commits_.front()->index() > pending_config_.index())) {
- LogEntry pending_e;
- pending_e.set_term(pending_config_.term());
- pending_e.set_index(pending_config_.index());
- pending_e.mutable_config()->CopyFrom(pending_config_);
- entries->push_back(pending_e);
+ virtual void
+ Tick(double now)
+ {
+ if (i_am_in_nodes() && !other_nodes_.empty() && now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
+ double r = 0.0;
+ drand48_r(&rand_, &r);
+ random_election_delay_ = election_timeout_ * r;
+ last_heartbeat_ = now;
+ VoteForMe();
+ return;
+ }
+ // Send heartbeats at 1/4 of timeout to allow for lost
+ // packets/connections.
+ if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
+ last_heartbeat_sent_ = now;
+ ReplicateAll(true);
+ }
}
- if (uncommitted)
- for (auto& e : waiting_commits_) entries->push_back(*e);
- }
- virtual void Stop() { Abdicate(); }
-
- private:
- struct NodeState {
- int64_t term = -1;
- int64_t sent_term = 0;
- int64_t sent_index = 0;
- int64_t last_log_term = -1;
- int64_t last_log_index = -1;
- double ack_received = -1.0e10;
- ::std::string vote;
- };
+ virtual void
+ Propose(const LogEntry &e)
+ {
+ assert(i_am_leader());
+ LogEntry entry(e);
+ entry.set_term(term_);
+ entry.set_index(index_ + 1);
+ entry.set_previous_log_term(last_log_term_);
+ entry.set_previous_log_index(index_);
+ ProcessLogEntry(entry, false);
+ ReplicateAll(false);
+ Commit(false);
+ }
- Message InitializeMessage() {
- Message m;
- m.set_term(term_);
- m.set_last_log_term(last_log_term_);
- m.set_last_log_index(index_);
- m.set_from(node_);
- m.set_leader(leader_);
- m.set_data_committed(data_committed_);
- m.set_config_committed(config_committed_);
- return m;
- }
+ virtual void
+ Run(double now, const Message &m)
+ {
+ if (m.term() >= term_)
+ seen_term_ = true;
+ if (m.term() < term_)
+ return; // Ignore messages from terms gone by.
+ if (m.term() > term_)
+ NewTerm(m.term(), m.leader(), false);
+ if (m.leader() != "" && leader_ != m.leader() && other_nodes_.count(m.from())) { // Only from nodes I acknowledge.
+ leader_ = m.leader();
+ server_->LeaderChange(this, leader_);
+ }
+ auto &n = node_state_[m.from()];
+ if (n.term != m.term()) {
+ n.term = m.term();
+ n.vote = "";
+ }
+ n.term = term_;
+ n.last_log_term = m.last_log_term();
+ n.last_log_index = m.last_log_index();
+ if (m.from() != leader_ || m.has_vote()) {
+ HandleAck(now, m, &n);
+ if (m.has_vote())
+ HandleVote(m, &n);
+ return;
+ }
+ last_heartbeat_ = now;
+ if (m.config_committed() > config_committed_ || m.data_committed() > data_committed_) {
+ config_committed_ = m.config_committed();
+ data_committed_ = m.data_committed();
+ WriteInternalLogEntry();
+ }
+ if (m.has_entry())
+ Ack(ProcessLogEntry(m.entry(), false));
+ else
+ Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
+ Commit(false);
+ }
- void NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery) {
- vote_ = "";
- term_ = term;
- leader_ = new_leader;
- waiting_commits_.clear();
- if (!in_recovery) {
- WriteInternalLogEntry();
- server_->LeaderChange(this, leader_);
+ virtual void
+ Snapshot(bool uncommitted, ::std::vector<LogEntry> *entries)
+ {
+ entries->clear();
+ LogEntry config_e;
+ config_e.set_term(config_.term());
+ config_e.set_index(config_.index());
+ config_e.set_vote(vote_);
+ config_e.set_data_committed(data_committed_);
+ config_e.set_config_committed(config_committed_);
+ config_e.mutable_config()->CopyFrom(config_);
+ entries->push_back(config_e);
+ if (pending_config_.has_term() && (!waiting_commits_.size() || // If it isn't in the waiting_commits.
+ waiting_commits_.front()->index() > pending_config_.index())) {
+ LogEntry pending_e;
+ pending_e.set_term(pending_config_.term());
+ pending_e.set_index(pending_config_.index());
+ pending_e.mutable_config()->CopyFrom(pending_config_);
+ entries->push_back(pending_e);
+ }
+ if (uncommitted)
+ for (auto &e : waiting_commits_)
+ entries->push_back(*e);
}
- }
- void VoteForMe() {
- if (seen_term_ || leader_ != "" || vote_ != node_) {
- vote_ = node_;
- term_++;
- leader_ = "";
- waiting_commits_.clear();
- WriteInternalLogEntry();
- server_->LeaderChange(this, leader_);
- seen_term_ = false;
+ virtual void
+ Stop()
+ {
+ Abdicate();
}
- Vote();
- }
- void Vote() {
- Message m(InitializeMessage());
- m.set_vote(vote_);
- if (vote_ == node_)
- SendToReplicas(m);
- else
- server_->SendMessage(this, vote_, m);
- }
+ private:
+ struct NodeState {
+ int64_t term = -1;
+ int64_t sent_term = 0;
+ int64_t sent_index = 0;
+ int64_t last_log_term = -1;
+ int64_t last_log_index = -1;
+ double ack_received = -1.0e10;
+ ::std::string vote;
+ };
+
+ Message
+ InitializeMessage()
+ {
+ Message m;
+ m.set_term(term_);
+ m.set_last_log_term(last_log_term_);
+ m.set_last_log_index(index_);
+ m.set_from(node_);
+ m.set_leader(leader_);
+ m.set_data_committed(data_committed_);
+ m.set_config_committed(config_committed_);
+ return m;
+ }
- void HandleVote(const Message& m, NodeState* n) {
- n->vote = m.vote();
- if (vote_.empty()) { // I have not voted yet.
- if (m.vote() == node_) { // Abdication.
- VoteForMe();
- } else if (m.last_log_term() >= last_log_term_ &&
- m.last_log_index() >= index_) {
- // Vote for candidate if it is at least as up to date as we are.
- vote_ = m.vote();
+ void
+ NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery)
+ {
+ vote_ = "";
+ term_ = term;
+ leader_ = new_leader;
+ waiting_commits_.clear();
+ if (!in_recovery) {
WriteInternalLogEntry();
- Vote();
- }
- } else if (vote_ == node_ && node_ == n->vote) {
- int votes = 0;
- for (auto& o : other_config_nodes_) {
- auto& s = node_state_[o];
- if (s.term == term_ && s.vote == node_) votes++;
+ server_->LeaderChange(this, leader_);
}
- if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
- leader_ = node_;
+ }
+
+ void
+ VoteForMe()
+ {
+ if (seen_term_ || leader_ != "" || vote_ != node_) {
+ vote_ = node_;
+ term_++;
+ leader_ = "";
+ waiting_commits_.clear();
WriteInternalLogEntry();
server_->LeaderChange(this, leader_);
- HeartBeat(); // Inform the others.
+ seen_term_ = false;
}
+ Vote();
}
- }
- void Ack(bool ack) {
- Message m(InitializeMessage());
- if (!ack) { // Reset local log state to last committed.
- m.set_nack(true);
- m.set_last_log_term(last_log_committed_term_);
- m.set_last_log_index(last_log_committed_index_);
- index_ = last_log_committed_index_;
- last_log_term_ = last_log_committed_term_;
+ void
+ Vote()
+ {
+ Message m(InitializeMessage());
+ m.set_vote(vote_);
+ if (vote_ == node_)
+ SendToReplicas(m);
+ else
+ server_->SendMessage(this, vote_, m);
}
- server_->SendMessage(this, leader_, m);
- }
- void HandleAck(double now, const Message& m, NodeState* n) {
- n->ack_received = now;
- if (m.nack()) {
- n->sent_index = n->last_log_index;
- n->sent_term = n->last_log_term;
- } else if (i_am_leader()) {
- int acks_needed = (other_nodes_.size() + 1) / 2;
- for (auto& o : other_nodes_)
- if (node_state_[o].ack_received >= last_heartbeat_sent_) acks_needed--;
- if (acks_needed <= 0) last_heartbeat_ = now;
- UpdateCommitted();
+ void
+ HandleVote(const Message &m, NodeState *n)
+ {
+ n->vote = m.vote();
+ if (vote_.empty()) { // I have not voted yet.
+ if (m.vote() == node_) { // Abdication.
+ VoteForMe();
+ } else if (m.last_log_term() >= last_log_term_ && m.last_log_index() >= index_) {
+ // Vote for candidate if it is at least as up to date as we are.
+ vote_ = m.vote();
+ WriteInternalLogEntry();
+ Vote();
+ }
+ } else if (vote_ == node_ && node_ == n->vote) {
+ int votes = 0;
+ for (auto &o : other_config_nodes_) {
+ auto &s = node_state_[o];
+ if (s.term == term_ && s.vote == node_)
+ votes++;
+ }
+ if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
+ leader_ = node_;
+ WriteInternalLogEntry();
+ server_->LeaderChange(this, leader_);
+ HeartBeat(); // Inform the others.
+ }
+ }
}
- }
-
- void HeartBeat() {
- Message m(InitializeMessage());
- SendToReplicas(m);
- }
- void SendToReplicas(const Message& m) {
- for (auto& n : replicas_) server_->SendMessage(this, n, m);
- }
+ void
+ Ack(bool ack)
+ {
+ Message m(InitializeMessage());
+ if (!ack) { // Reset local log state to last committed.
+ m.set_nack(true);
+ m.set_last_log_term(last_log_committed_term_);
+ m.set_last_log_index(last_log_committed_index_);
+ index_ = last_log_committed_index_;
+ last_log_term_ = last_log_committed_term_;
+ }
+ server_->SendMessage(this, leader_, m);
+ }
- void Abdicate() {
- if (!i_am_leader()) return;
- // Attempt to pass leadership to a worthy successor.
- const ::std::string* best_node = nullptr;
- NodeState* best = nullptr;
- for (auto& n : other_nodes_) {
- auto& s = node_state_[n];
- if (!best || (s.last_log_term > best->last_log_term ||
- (s.last_log_term == best->last_log_term &&
- s.last_log_index > best->last_log_index))) {
- best_node = &n;
- best = &s;
+ void
+ HandleAck(double now, const Message &m, NodeState *n)
+ {
+ n->ack_received = now;
+ if (m.nack()) {
+ n->sent_index = n->last_log_index;
+ n->sent_term = n->last_log_term;
+ } else if (i_am_leader()) {
+ int acks_needed = (other_nodes_.size() + 1) / 2;
+ for (auto &o : other_nodes_)
+ if (node_state_[o].ack_received >= last_heartbeat_sent_)
+ acks_needed--;
+ if (acks_needed <= 0)
+ last_heartbeat_ = now;
+ UpdateCommitted();
}
}
- if (best_node) {
- term_++;
- leader_ = "";
- vote_ = *best_node;
- WriteInternalLogEntry();
+
+ void
+ HeartBeat()
+ {
Message m(InitializeMessage());
- m.set_vote(vote_);
- server_->SendMessage(this, vote_, m);
+ SendToReplicas(m);
}
- }
-
- void WriteInternalLogEntry() {
- LogEntry e;
- e.set_term(term_);
- e.set_leader(leader_);
- e.set_vote(vote_);
- e.set_data_committed(data_committed_);
- e.set_config_committed(config_committed_);
- server_->WriteLogEntry(this, e);
- }
- bool ProcessLogEntry(const LogEntry& e, bool in_recovery) {
- if (e.has_config()) {
- pending_config_.CopyFrom(e.config());
- pending_config_.set_term(e.term());
- pending_config_.set_index(e.index());
- ConfigChanged();
+ void
+ SendToReplicas(const Message &m)
+ {
+ for (auto &n : replicas_)
+ server_->SendMessage(this, n, m);
}
- if (e.has_index()) { // Not an internal entry.
- std::unique_ptr<LogEntry> entry(new LogEntry(e));
- if (e.index() <= index_) return true; // Already seen this.
- if (!entry->has_term()) { // Summary, fill in the internal bits.
- entry->set_term(term_);
- index_ = entry->index() - 1; // Summary need not have an extent().
- entry->set_previous_log_term(last_log_term_);
- entry->set_previous_log_index(index_);
+
+ void
+ Abdicate()
+ {
+ if (!i_am_leader())
+ return;
+ // Attempt to pass leadership to a worthy successor.
+ const ::std::string *best_node = nullptr;
+ NodeState *best = nullptr;
+ for (auto &n : other_nodes_) {
+ auto &s = node_state_[n];
+ if (!best || (s.last_log_term > best->last_log_term ||
+ (s.last_log_term == best->last_log_term && s.last_log_index > best->last_log_index))) {
+ best_node = &n;
+ best = &s;
+ }
}
- if (e.term() < last_log_term_) return true; // Already seen this.
- if (e.term() == last_log_term_ && e.index() <= index_) return true;
- if ((entry->previous_log_term() != last_log_term_ ||
- entry->previous_log_index() != index_))
- return false; // Out of sequence.
- if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
- return false; // Out of sequence.
- last_log_term_ = entry->term();
- index_ = entry->index() + entry->extent();
- if (!in_recovery && i_am_leader()) {
- if (!other_nodes_.size()) data_committed_ = index_;
- if (!other_config_nodes_.size()) config_committed_ = index_;
+ if (best_node) {
+ term_++;
+ leader_ = "";
+ vote_ = *best_node;
+ WriteInternalLogEntry();
+ Message m(InitializeMessage());
+ m.set_vote(vote_);
+ server_->SendMessage(this, vote_, m);
}
- entry->set_data_committed(data_committed_);
- entry->set_config_committed(config_committed_);
- if (!in_recovery) server_->WriteLogEntry(this, *entry);
- waiting_commits_.emplace_back(entry.release());
}
- return true;
- }
- int MajorityIndex(const ::std::set<::std::string>& other) {
- ::std::vector<int64_t> indices(1, index_);
- for (auto& o : other) indices.push_back(node_state_[o].last_log_index);
- sort(indices.begin(), indices.end());
- return indices[indices.size() / 2];
- }
+ void
+ WriteInternalLogEntry()
+ {
+ LogEntry e;
+ e.set_term(term_);
+ e.set_leader(leader_);
+ e.set_vote(vote_);
+ e.set_data_committed(data_committed_);
+ e.set_config_committed(config_committed_);
+ server_->WriteLogEntry(this, e);
+ }
- void UpdateCommitted() {
- int i = MajorityIndex(other_nodes_);
- if (i > data_committed_) {
- data_committed_ = i;
- WriteInternalLogEntry();
- Commit(false);
- HeartBeat();
+ bool
+ ProcessLogEntry(const LogEntry &e, bool in_recovery)
+ {
+ if (e.has_config()) {
+ pending_config_.CopyFrom(e.config());
+ pending_config_.set_term(e.term());
+ pending_config_.set_index(e.index());
+ ConfigChanged();
+ }
+ if (e.has_index()) { // Not an internal entry.
+ std::unique_ptr<LogEntry> entry(new LogEntry(e));
+ if (e.index() <= index_)
+ return true; // Already seen this.
+ if (!entry->has_term()) { // Summary, fill in the internal bits.
+ entry->set_term(term_);
+ index_ = entry->index() - 1; // Summary need not have an extent().
+ entry->set_previous_log_term(last_log_term_);
+ entry->set_previous_log_index(index_);
+ }
+ if (e.term() < last_log_term_)
+ return true; // Already seen this.
+ if (e.term() == last_log_term_ && e.index() <= index_)
+ return true;
+ if ((entry->previous_log_term() != last_log_term_ || entry->previous_log_index() != index_))
+ return false; // Out of sequence.
+ if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
+ return false; // Out of sequence.
+ last_log_term_ = entry->term();
+ index_ = entry->index() + entry->extent();
+ if (!in_recovery && i_am_leader()) {
+ if (!other_nodes_.size())
+ data_committed_ = index_;
+ if (!other_config_nodes_.size())
+ config_committed_ = index_;
+ }
+ entry->set_data_committed(data_committed_);
+ entry->set_config_committed(config_committed_);
+ if (!in_recovery)
+ server_->WriteLogEntry(this, *entry);
+ waiting_commits_.emplace_back(entry.release());
+ }
+ return true;
}
- if (pending_config_.has_term()) { // If a pending configuration change.
- int ci = MajorityIndex(other_config_nodes_);
- // config_committed must be <= data_committed, so the new
- // configuration must also concur with the new data_committed.
- if (i == ci && ci > config_committed_) {
- config_committed_ = ci;
+
+ int
+ MajorityIndex(const ::std::set< ::std::string> &other)
+ {
+ ::std::vector<int64_t> indices(1, index_);
+ for (auto &o : other)
+ indices.push_back(node_state_[o].last_log_index);
+ sort(indices.begin(), indices.end());
+ return indices[indices.size() / 2];
+ }
+
+ void
+ UpdateCommitted()
+ {
+ int i = MajorityIndex(other_nodes_);
+ if (i > data_committed_) {
+ data_committed_ = i;
WriteInternalLogEntry();
Commit(false);
HeartBeat();
- if (!i_am_leader() && other_nodes_.size() > 1) Abdicate();
+ }
+ if (pending_config_.has_term()) { // If a pending configuration change.
+ int ci = MajorityIndex(other_config_nodes_);
+ // config_committed must be <= data_committed, so the new
+ // configuration must also concur with the new data_committed.
+ if (i == ci && ci > config_committed_) {
+ config_committed_ = ci;
+ WriteInternalLogEntry();
+ Commit(false);
+ HeartBeat();
+ if (!i_am_leader() && other_nodes_.size() > 1)
+ Abdicate();
+ }
}
}
- }
- void Commit(bool in_recovery) {
- ::std::vector<std::unique_ptr<LogEntry>> pending;
- while (!waiting_commits_.empty() &&
- waiting_commits_.front()->index() <= data_committed_) {
- auto& e = waiting_commits_.front();
- while (!pending.empty() && e->index() <= pending.back()->index())
- pending.pop_back();
- pending.emplace_back(e.release());
- waiting_commits_.pop_front();
- }
- for (auto& e : pending) {
- server_->CommitLogEntry(this, *e);
- last_log_committed_term_ = e->term();
- last_log_committed_index_ = e->index();
+ void
+ Commit(bool in_recovery)
+ {
+ ::std::vector<std::unique_ptr<LogEntry> > pending;
+ while (!waiting_commits_.empty() && waiting_commits_.front()->index() <= data_committed_) {
+ auto &e = waiting_commits_.front();
+ while (!pending.empty() && e->index() <= pending.back()->index())
+ pending.pop_back();
+ pending.emplace_back(e.release());
+ waiting_commits_.pop_front();
+ }
+ for (auto &e : pending) {
+ server_->CommitLogEntry(this, *e);
+ last_log_committed_term_ = e->term();
+ last_log_committed_index_ = e->index();
+ }
+ CommitConfig(in_recovery);
}
- CommitConfig(in_recovery);
- }
- void CommitConfig(bool in_recovery) {
- if (pending_config_.has_term() && pending_config_.term() == term_ &&
- pending_config_.index() <= config_committed_) {
- config_.Swap(&pending_config_);
- pending_config_.Clear();
- server_->ConfigChange(this, config_);
- if (ConfigChanged()) {
- NewTerm(term_ + 1, leader_, in_recovery);
- if (!in_recovery) HeartBeat();
+ void
+ CommitConfig(bool in_recovery)
+ {
+ if (pending_config_.has_term() && pending_config_.term() == term_ && pending_config_.index() <= config_committed_) {
+ config_.Swap(&pending_config_);
+ pending_config_.Clear();
+ server_->ConfigChange(this, config_);
+ if (ConfigChanged()) {
+ NewTerm(term_ + 1, leader_, in_recovery);
+ if (!in_recovery)
+ HeartBeat();
+ }
}
}
- }
- bool ConfigChanged() { // Returns: true if the leader_ changed.
- other_nodes_.clear();
- other_config_nodes_.clear();
- replicas_.clear();
- for (auto& n : config_.node())
- if (n != node_) {
- other_nodes_.insert(n);
- other_config_nodes_.insert(n);
- }
- for (auto& n : pending_config_.node())
- if (n != node_) other_config_nodes_.insert(n);
- replicas_.insert(config_.replica().begin(), config_.replica().end());
- replicas_.insert(pending_config_.replica().begin(),
- pending_config_.replica().end());
- replicas_.insert(other_nodes_.begin(), other_nodes_.end());
- replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
- ::std::string old_leader = leader_;
- if (!other_nodes_.size())
- leader_ = node_;
- else if (!i_am_in_nodes() && other_nodes_.size() == 1)
- leader_ = *other_nodes_.begin();
- else if (leader_ == node_ && !i_am_in_nodes())
- leader_ = "";
- return leader_ != old_leader;
- }
+ bool
+ ConfigChanged()
+ { // Returns: true if the leader_ changed.
+ other_nodes_.clear();
+ other_config_nodes_.clear();
+ replicas_.clear();
+ for (auto &n : config_.node())
+ if (n != node_) {
+ other_nodes_.insert(n);
+ other_config_nodes_.insert(n);
+ }
+ for (auto &n : pending_config_.node())
+ if (n != node_)
+ other_config_nodes_.insert(n);
+ replicas_.insert(config_.replica().begin(), config_.replica().end());
+ replicas_.insert(pending_config_.replica().begin(), pending_config_.replica().end());
+ replicas_.insert(other_nodes_.begin(), other_nodes_.end());
+ replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
+ ::std::string old_leader = leader_;
+ if (!other_nodes_.size())
+ leader_ = node_;
+ else if (!i_am_in_nodes() && other_nodes_.size() == 1)
+ leader_ = *other_nodes_.begin();
+ else if (leader_ == node_ && !i_am_in_nodes())
+ leader_ = "";
+ return leader_ != old_leader;
+ }
- bool SendReplicationMessage(const ::std::string& n, const LogEntry& entry,
- NodeState* s) {
- Message m(InitializeMessage());
- m.mutable_entry()->CopyFrom(entry);
- if (!server_->SendMessage(this, n, m)) return false;
- s->sent_index = entry.index() + entry.extent();
- s->sent_term = entry.term();
- return true;
- }
+ bool
+ SendReplicationMessage(const ::std::string &n, const LogEntry &entry, NodeState *s)
+ {
+ Message m(InitializeMessage());
+ m.mutable_entry()->CopyFrom(entry);
+ if (!server_->SendMessage(this, n, m))
+ return false;
+ s->sent_index = entry.index() + entry.extent();
+ s->sent_term = entry.term();
+ return true;
+ }
- void Replicate(const ::std::string& n, bool heartbeat) {
- bool sent = false;
- auto& s = node_state_[n];
- if (s.term == term_) { // Replica has acknowledged me as leader.
- int64_t end = index_;
- if (waiting_commits_.size()) end = waiting_commits_.front()->index() - 1;
- while (s.sent_index < end) { // Get from server.
- LogEntry entry;
- server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
- if (!entry.has_term()) {
- // A summary log entry from the server with historical information.
- entry.set_term(last_log_term_);
- entry.set_index(s.sent_index + 1);
+ void
+ Replicate(const ::std::string &n, bool heartbeat)
+ {
+ bool sent = false;
+ auto &s = node_state_[n];
+ if (s.term == term_) { // Replica has acknowledged me as leader.
+ int64_t end = index_;
+ if (waiting_commits_.size())
+ end = waiting_commits_.front()->index() - 1;
+ while (s.sent_index < end) { // Get from server.
+ LogEntry entry;
+ server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
+ if (!entry.has_term()) {
+ // A summary log entry from the server with historical information.
+ entry.set_term(last_log_term_);
+ entry.set_index(s.sent_index + 1);
+ }
+ entry.set_previous_log_term(s.sent_term);
+ entry.set_previous_log_index(s.sent_index);
+ assert(entry.index() > s.sent_index);
+ int64_t x = s.sent_index;
+ if (!SendReplicationMessage(n, entry, &s))
+ break;
+ assert(s.sent_index > x);
+ sent = true;
+ }
+ for (auto &e : waiting_commits_) {
+ if (e->index() <= s.sent_index) // Skip those already sent.
+ continue;
+ if (!SendReplicationMessage(n, *e, &s))
+ break;
+ sent = true;
}
- entry.set_previous_log_term(s.sent_term);
- entry.set_previous_log_index(s.sent_index);
- assert(entry.index() > s.sent_index);
- int64_t x = s.sent_index;
- if (!SendReplicationMessage(n, entry, &s)) break;
- assert(s.sent_index > x);
- sent = true;
}
- for (auto& e : waiting_commits_) {
- if (e->index() <= s.sent_index) // Skip those already sent.
- continue;
- if (!SendReplicationMessage(n, *e, &s)) break;
- sent = true;
+ if (heartbeat && !sent) {
+ Message m(InitializeMessage());
+ server_->SendMessage(this, n, m);
}
}
- if (heartbeat && !sent) {
- Message m(InitializeMessage());
- server_->SendMessage(this, n, m);
+
+ void
+ ReplicateAll(bool heartbeat)
+ {
+ for (auto &n : replicas_)
+ Replicate(n, heartbeat);
}
- }
- void ReplicateAll(bool heartbeat) {
- for (auto& n : replicas_) Replicate(n, heartbeat);
- }
+ bool
+ i_am_leader()
+ {
+ return node_ == leader_;
+ }
+ bool
+ i_am_in_nodes()
+ {
+ auto &n = config_.node();
+ return std::find(n.begin(), n.end(), node_) != n.end();
+ }
- bool i_am_leader() { return node_ == leader_; }
- bool i_am_in_nodes() {
- auto& n = config_.node();
- return std::find(n.begin(), n.end(), node_) != n.end();
- }
+ Server *const server_;
+ struct drand48_data rand_;
+ const ::std::string node_;
+ int64_t term_ = 0; // Current term.
+ int64_t last_log_term_ = -1; // Term of last log entry this node has.
+ int64_t index_ = 0; // Index of last log entry this node has.
+ int64_t config_committed_ = -1;
+ int64_t data_committed_ = -1;
+ int64_t last_log_committed_index_ = -1;
+ int64_t last_log_committed_term_ = -1;
+ double election_timeout_ = 1.0;
+ double last_heartbeat_ = -1.0e10;
+ double last_heartbeat_sent_ = -1.0e10;
+ double random_election_delay_ = 0.0;
+ ::std::string leader_; // The current leader. "" if there is no leader.
+ ::std::string vote_; // My vote this term.
+ Config config_;
+ Config pending_config_;
+ ::std::map< ::std::string, NodeState> node_state_;
+ ::std::deque<std::unique_ptr<LogEntry> > waiting_commits_;
+ bool seen_term_ = true;
+ // Cached values.
+ ::std::set< ::std::string> other_nodes_; // Nodes required for consensus on log entries.
+ ::std::set< ::std::string> other_config_nodes_; // Nodes required for config changes.
+ ::std::set< ::std::string> replicas_; // All nodes receiving the replication stream.
+ };
- Server* const server_;
- struct drand48_data rand_;
- const ::std::string node_;
- int64_t term_ = 0; // Current term.
- int64_t last_log_term_ = -1; // Term of last log entry this node has.
- int64_t index_ = 0; // Index of last log entry this node has.
- int64_t config_committed_ = -1;
- int64_t data_committed_ = -1;
- int64_t last_log_committed_index_ = -1;
- int64_t last_log_committed_term_ = -1;
- double election_timeout_ = 1.0;
- double last_heartbeat_ = -1.0e10;
- double last_heartbeat_sent_ = -1.0e10;
- double random_election_delay_ = 0.0;
- ::std::string leader_; // The current leader. "" if there is no leader.
- ::std::string vote_; // My vote this term.
- Config config_;
- Config pending_config_;
- ::std::map<::std::string, NodeState> node_state_;
- ::std::deque<std::unique_ptr<LogEntry>> waiting_commits_;
- bool seen_term_ = true;
- // Cached values.
- ::std::set<::std::string> other_nodes_; // Nodes required for consensus on log entries.
- ::std::set<::std::string> other_config_nodes_; // Nodes required for config changes.
- ::std::set<::std::string> replicas_; // All nodes receiving the replication stream.
-};
-
-template <typename Server>
-Raft<Server>* NewRaft(Server* server, const ::std::string &node) {
- return new RaftImpl<Server>(server, node);
-}
-} // namespace raft
-#endif // CONSENSUS_IMPL_H_
+ template <typename Server> Raft<Server> *NewRaft(Server * server, const ::std::string &node)
+ {
+ return new RaftImpl<Server>(server, node);
+ }
+} // namespace raft
+#endif // CONSENSUS_IMPL_H_
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/db4029a2/lib/raft/raft_test.cc
----------------------------------------------------------------------
diff --git a/lib/raft/raft_test.cc b/lib/raft/raft_test.cc
index b52ef11..71b6e6e 100644
--- a/lib/raft/raft_test.cc
+++ b/lib/raft/raft_test.cc
@@ -1,17 +1,25 @@
-// Copyright 2014 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
+/** @file
+ This is the primary include file for the proxy cache system.
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
#include "raft.h"
#include <algorithm>
@@ -38,39 +46,43 @@ using ::std::to_string;
const int kMaxServers = 10;
-namespace raft {
-
+namespace raft
+{
class RaftTest;
-class RaftServer : public RaftServerInterface {
- public:
+class RaftServer : public RaftServerInterface
+{
+public:
typedef RaftMessagePb Message;
typedef RaftLogEntryPb LogEntry;
typedef RaftConfigPb Config;
typedef Raft<RaftServer> RaftClass;
- RaftServer(const string node, RaftTest* test) {
+ RaftServer(const string node, RaftTest *test)
+ {
node_ = node;
test_ = test;
raft_.reset(NewRaft(this, node));
}
- bool SendMessage(RaftClass* raft, const string& node,
- const Message& message);
+ bool SendMessage(RaftClass *raft, const string &node, const Message &message);
- void GetLogEntry(RaftClass* raft, int64_t term, int64_t start,
- int64_t end, LogEntry* entry) {
+ void
+ GetLogEntry(RaftClass *raft, int64_t term, int64_t start, int64_t end, LogEntry *entry)
+ {
if (use_commit_log_) {
- for (auto& e : commits_) {
- if (e->term() < term) continue;
+ for (auto &e : commits_) {
+ if (e->term() < term)
+ continue;
if (e->index() >= start) {
entry->CopyFrom(*e);
return;
}
}
} else {
- for (auto& e : log_) {
- if (e->term() < term) continue;
+ for (auto &e : log_) {
+ if (e->term() < term)
+ continue;
if (e->has_index() && e->index() >= start) {
entry->CopyFrom(*e);
return;
@@ -80,11 +92,15 @@ class RaftServer : public RaftServerInterface {
entry->Clear();
}
- void WriteLogEntry(RaftClass* raft, const LogEntry& entry) {
+ void
+ WriteLogEntry(RaftClass *raft, const LogEntry &entry)
+ {
log_.emplace_back(new LogEntry(entry));
}
- void CommitLogEntry(RaftClass* raft, const LogEntry& entry) {
+ void
+ CommitLogEntry(RaftClass *raft, const LogEntry &entry)
+ {
commits_.emplace_back(new LogEntry(entry));
string s = entry.data();
auto p = s.find("=");
@@ -92,122 +108,149 @@ class RaftServer : public RaftServerInterface {
state_[s.substr(0, p)] = make_pair(entry.index(), s.substr(p + 1));
}
- void LeaderChange(RaftClass* raft, const string& leader) {
+ void
+ LeaderChange(RaftClass *raft, const string &leader)
+ {
leader_ = leader;
}
- void ConfigChange(RaftClass* raft, const Config& config) {
+ void
+ ConfigChange(RaftClass *raft, const Config &config)
+ {
config_.reset(new Config(config));
}
bool use_commit_log_ = false;
- RaftTest* test_;
+ RaftTest *test_;
unique_ptr<Config> config_;
string node_;
string leader_;
unique_ptr<RaftClass> raft_;
- vector<unique_ptr<LogEntry>> log_;
- vector<unique_ptr<LogEntry>> commits_;
- map<string, pair<int64_t, string>> state_;
+ vector<unique_ptr<LogEntry> > log_;
+ vector<unique_ptr<LogEntry> > commits_;
+ map<string, pair<int64_t, string> > state_;
};
template <typename T>
-bool firstless(const T& a, const T& b) {
+bool
+firstless(const T &a, const T &b)
+{
return a.first < b.first;
}
-class RaftTest : public ::testing::Test {
- public:
+class RaftTest : public ::testing::Test
+{
+public:
typedef RaftServer::Message Message;
typedef RaftServer::LogEntry LogEntry;
typedef RaftServer::Config Config;
- void SendMessage(const string& from, const string& to,
- const Message& message) {
- if (down_.count(from) || down_.count(to)) return;
+ void
+ SendMessage(const string &from, const string &to, const Message &message)
+ {
+ if (down_.count(from) || down_.count(to))
+ return;
messages_.emplace_back(make_pair(to, new Message(message)));
}
- void ForwardMessages() {
+ void
+ ForwardMessages()
+ {
while (!messages_.empty()) {
- auto& p = messages_.front();
- for (auto& s : servers_)
- if (p.first == s->node_) s->raft_->Run(now_, *p.second);
+ auto &p = messages_.front();
+ for (auto &s : servers_)
+ if (p.first == s->node_)
+ s->raft_->Run(now_, *p.second);
delete p.second;
messages_.pop_front();
}
}
- protected:
+protected:
RaftTest() : now_(0) {}
- virtual ~RaftTest() {
- for (auto& p : messages_) delete p.second;
+ virtual ~RaftTest()
+ {
+ for (auto &p : messages_)
+ delete p.second;
}
// 20 ticks gives 2 full election timeouts because the timeouts are random
// on any given node and very from [1, 2) timeouts.
- void Ticks(int n) {
+ void
+ Ticks(int n)
+ {
for (int i = 0; i < n; i++) {
now_ += 0.1;
- for (auto& s : servers_) {
+ for (auto &s : servers_) {
s->raft_->Tick(now_);
ForwardMessages();
}
}
}
- void StartUp(int n, const LogEntry& config_log_entry) {
+ void
+ StartUp(int n, const LogEntry &config_log_entry)
+ {
int offset = servers_.size();
for (int i = offset; i < n + offset; i++) {
servers_.emplace_back(new RaftServer(to_string(i), this));
- auto& raft = *servers_[i]->raft_.get();
+ auto &raft = *servers_[i]->raft_.get();
raft.Recover(config_log_entry);
raft.Start(0, i);
}
}
- void CrashAndRecover(int i, const LogEntry& config_log_entry) {
- vector<unique_ptr<LogEntry>> log;
- for (auto& p : servers_[i]->log_) log.emplace_back(p.release());
+ void
+ CrashAndRecover(int i, const LogEntry &config_log_entry)
+ {
+ vector<unique_ptr<LogEntry> > log;
+ for (auto &p : servers_[i]->log_)
+ log.emplace_back(p.release());
servers_[i].reset(new RaftServer(to_string(i), this));
- auto& raft = *servers_[i]->raft_.get();
+ auto &raft = *servers_[i]->raft_.get();
raft.Recover(config_log_entry);
- for (auto& p : log) {
+ for (auto &p : log) {
raft.Recover(*p);
servers_[i]->log_.emplace_back(p.release());
}
raft.Start(now_, i);
}
- void CrashAndBurn(int i, const LogEntry& config_log_entry) {
+ void
+ CrashAndBurn(int i, const LogEntry &config_log_entry)
+ {
servers_[i].reset(new RaftServer(to_string(i), this));
- auto& raft = *servers_[i]->raft_.get();
+ auto &raft = *servers_[i]->raft_.get();
raft.Recover(config_log_entry);
raft.Start(now_, i);
}
- void SnapshotCrashAndRecover(int i, const LogEntry& config_log_entry) {
+ void
+ SnapshotCrashAndRecover(int i, const LogEntry &config_log_entry)
+ {
vector<LogEntry> entries;
- vector<pair<int64_t, string>> state;
- for (auto& p : servers_[i]->state_)
- state.push_back(
- make_pair(p.second.first, p.first + "=" + p.second.second));
- std::sort(state.begin(), state.end(), firstless<pair<int64_t, string>>);
+ vector<pair<int64_t, string> > state;
+ for (auto &p : servers_[i]->state_)
+ state.push_back(make_pair(p.second.first, p.first + "=" + p.second.second));
+ std::sort(state.begin(), state.end(), firstless<pair<int64_t, string> >);
servers_[i]->log_.clear();
- for (auto& s : state) {
- LogEntry* e = new LogEntry;
+ for (auto &s : state) {
+ LogEntry *e = new LogEntry;
e->set_index(s.first);
e->set_data(s.second);
servers_[i]->log_.emplace_back(e);
}
- auto& raft = *servers_[i]->raft_.get();
+ auto &raft = *servers_[i]->raft_.get();
raft.Snapshot(false, &entries);
- for (auto& e : entries) servers_[i]->log_.emplace_back(new LogEntry(e));
+ for (auto &e : entries)
+ servers_[i]->log_.emplace_back(new LogEntry(e));
servers_[i]->state_.clear();
CrashAndRecover(i, config_log_entry);
}
- LogEntry ConfigLogEntry(int n) {
+ LogEntry
+ ConfigLogEntry(int n)
+ {
LogEntry config_log_entry;
for (int i = 0; i < n; i++)
config_log_entry.mutable_config()->add_node(to_string(i));
@@ -216,25 +259,28 @@ class RaftTest : public ::testing::Test {
double now_;
set<string> down_;
- vector<unique_ptr<RaftServer>> servers_;
- deque<pair<string, Message*>> messages_;
+ vector<unique_ptr<RaftServer> > servers_;
+ deque<pair<string, Message *> > messages_;
};
-bool RaftServer::SendMessage(RaftClass* raft, const string& node,
- const Message& message) {
+bool
+RaftServer::SendMessage(RaftClass *raft, const string &node, const Message &message)
+{
test_->SendMessage(node_, node, message);
return true;
}
-TEST_F(RaftTest, OneEmptyConfig) {
+TEST_F(RaftTest, OneEmptyConfig)
+{
servers_.emplace_back(new RaftServer("0", this));
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
raft.Start(0, 0);
Ticks(20);
EXPECT_EQ(servers_[0]->leader_, "0");
}
-TEST_F(RaftTest, One) {
+TEST_F(RaftTest, One)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
StartUp(1, config_log_entry);
@@ -242,7 +288,8 @@ TEST_F(RaftTest, One) {
EXPECT_EQ(servers_[0]->leader_, "0");
}
-TEST_F(RaftTest, OneTwoNotParticipating) {
+TEST_F(RaftTest, OneTwoNotParticipating)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
// Startup server 0 as leader.
@@ -255,7 +302,8 @@ TEST_F(RaftTest, OneTwoNotParticipating) {
EXPECT_EQ(servers_[1]->leader_, "0");
}
-TEST_F(RaftTest, OneTwo) {
+TEST_F(RaftTest, OneTwo)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
// Startup server 0 as leader.
@@ -266,7 +314,7 @@ TEST_F(RaftTest, OneTwo) {
Ticks(20);
// Add 1 into consensus.
{
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
config_log_entry.mutable_config()->add_node("1");
@@ -279,7 +327,8 @@ TEST_F(RaftTest, OneTwo) {
EXPECT_EQ(servers_[1]->commits_.size(), 1);
}
-TEST_F(RaftTest, OneTwoSwitchToTwo) {
+TEST_F(RaftTest, OneTwoSwitchToTwo)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
// Startup servers 0, and 1 with 0 as leader.
@@ -288,7 +337,7 @@ TEST_F(RaftTest, OneTwoSwitchToTwo) {
Ticks(20);
// Add 1 into consensus.
{
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry config_log_entry(ConfigLogEntry(2));
raft.Propose(config_log_entry);
Ticks(20);
@@ -297,7 +346,7 @@ TEST_F(RaftTest, OneTwoSwitchToTwo) {
EXPECT_EQ(servers_[1]->leader_, "0");
// Switch to only having 1.
{
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("1");
config_log_entry.mutable_config()->add_replica("0");
@@ -308,7 +357,8 @@ TEST_F(RaftTest, OneTwoSwitchToTwo) {
EXPECT_EQ(servers_[1]->leader_, "1");
}
-TEST_F(RaftTest, OneThenTwo) {
+TEST_F(RaftTest, OneThenTwo)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
// Startup servers 0 and 1, with 0 as leader.
@@ -317,7 +367,7 @@ TEST_F(RaftTest, OneThenTwo) {
Ticks(20);
// Switch to only having 1.
{
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("1");
raft.Propose(config_log_entry);
@@ -327,7 +377,8 @@ TEST_F(RaftTest, OneThenTwo) {
EXPECT_EQ(servers_[1]->leader_, "1");
}
-TEST_F(RaftTest, OneAndTwo) {
+TEST_F(RaftTest, OneAndTwo)
+{
LogEntry config_log_entry(ConfigLogEntry(2));
// Startup servers 0, and 1 in nodes.
StartUp(2, config_log_entry);
@@ -336,7 +387,8 @@ TEST_F(RaftTest, OneAndTwo) {
EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
}
-TEST_F(RaftTest, OneAndTwoAndThree) {
+TEST_F(RaftTest, OneAndTwoAndThree)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1 and 2 in nodes.
StartUp(3, config_log_entry);
@@ -346,7 +398,8 @@ TEST_F(RaftTest, OneAndTwoAndThree) {
EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
}
-TEST_F(RaftTest, OneAndTwoNotThree) {
+TEST_F(RaftTest, OneAndTwoNotThree)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1 with config [0, 1, 2].
StartUp(2, config_log_entry);
@@ -355,7 +408,8 @@ TEST_F(RaftTest, OneAndTwoNotThree) {
EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
}
-TEST_F(RaftTest, OneAndTwoThenTwoAndThree) {
+TEST_F(RaftTest, OneAndTwoThenTwoAndThree)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1 with config [0, 1, 2].
StartUp(2, config_log_entry);
@@ -370,22 +424,23 @@ TEST_F(RaftTest, OneAndTwoThenTwoAndThree) {
EXPECT_EQ(servers_[2]->leader_, servers_[1]->leader_);
}
-TEST_F(RaftTest, OneTwoThreeThenAbdicate) {
+TEST_F(RaftTest, OneTwoThreeThenAbdicate)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1, 2 with config [0, 1, 2].
StartUp(3, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
raft.Stop();
down_.insert(to_string(ileader));
- Ticks(1); // Abdication will cause immediate reelection.
+ Ticks(1); // Abdication will cause immediate reelection.
EXPECT_NE(servers_[(ileader + 1) % 3]->leader_, "");
- EXPECT_EQ(servers_[(ileader + 1) % 3]->leader_,
- servers_[(ileader + 2) % 3]->leader_);
+ EXPECT_EQ(servers_[(ileader + 1) % 3]->leader_, servers_[(ileader + 2) % 3]->leader_);
}
-TEST_F(RaftTest, OneTwoThreeThenAllSeparate) {
+TEST_F(RaftTest, OneTwoThreeThenAllSeparate)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1, 2 with config [0, 1, 2].
StartUp(3, config_log_entry);
@@ -399,7 +454,8 @@ TEST_F(RaftTest, OneTwoThreeThenAllSeparate) {
EXPECT_EQ(servers_[2]->leader_, "");
}
-TEST_F(RaftTest, OneTwoThreeThenAllSeparateThenTogether) {
+TEST_F(RaftTest, OneTwoThreeThenAllSeparateThenTogether)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
// Startup servers 0, 1, 2 with config [0, 1, 2].
StartUp(3, config_log_entry);
@@ -415,10 +471,11 @@ TEST_F(RaftTest, OneTwoThreeThenAllSeparateThenTogether) {
EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
}
-TEST_F(RaftTest, OneLog) {
+TEST_F(RaftTest, OneLog)
+{
LogEntry config_log_entry;
StartUp(1, config_log_entry);
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -428,10 +485,11 @@ TEST_F(RaftTest, OneLog) {
EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
}
-TEST_F(RaftTest, OneLogLog) {
+TEST_F(RaftTest, OneLogLog)
+{
LogEntry config_log_entry;
StartUp(1, config_log_entry);
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -445,13 +503,14 @@ TEST_F(RaftTest, OneLogLog) {
EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
}
-TEST_F(RaftTest, OneTwoLogLog) {
+TEST_F(RaftTest, OneTwoLogLog)
+{
LogEntry config_log_entry(ConfigLogEntry(2));
StartUp(2, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
int iother = ileader ? 0 : 1;
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -460,16 +519,13 @@ TEST_F(RaftTest, OneTwoLogLog) {
raft.Propose(log_entry);
Ticks(20);
EXPECT_EQ(servers_[ileader]->log_.size(), 7);
- EXPECT_NE(servers_[ileader]->log_[0]->vote(), ""); // vote.
- EXPECT_NE(servers_[ileader]->log_[1]->leader(), ""); // election.
- EXPECT_EQ(servers_[ileader]->log_[2]->data_committed(),
- servers_[ileader]->log_[1]->index());
+ EXPECT_NE(servers_[ileader]->log_[0]->vote(), ""); // vote.
+ EXPECT_NE(servers_[ileader]->log_[1]->leader(), ""); // election.
+ EXPECT_EQ(servers_[ileader]->log_[2]->data_committed(), servers_[ileader]->log_[1]->index());
EXPECT_EQ(servers_[ileader]->log_[3]->data(), "a");
- EXPECT_EQ(servers_[ileader]->log_[4]->data_committed(),
- servers_[ileader]->log_[3]->index());
+ EXPECT_EQ(servers_[ileader]->log_[4]->data_committed(), servers_[ileader]->log_[3]->index());
EXPECT_EQ(servers_[ileader]->log_[5]->data(), "b");
- EXPECT_EQ(servers_[ileader]->log_[6]->data_committed(),
- servers_[ileader]->log_[5]->index());
+ EXPECT_EQ(servers_[ileader]->log_[6]->data_committed(), servers_[ileader]->log_[5]->index());
EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
@@ -478,12 +534,13 @@ TEST_F(RaftTest, OneTwoLogLog) {
EXPECT_EQ(servers_[iother]->commits_[1]->data(), "b");
}
-TEST_F(RaftTest, OneTwoThreeLogDownLogUp) {
+TEST_F(RaftTest, OneTwoThreeLogDownLogUp)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
StartUp(3, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -506,12 +563,13 @@ TEST_F(RaftTest, OneTwoThreeLogDownLogUp) {
}
}
-TEST_F(RaftTest, OneTwoThreeLogLogThreeDamagedLogRestore) {
+TEST_F(RaftTest, OneTwoThreeLogLogThreeDamagedLogRestore)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
StartUp(3, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -531,23 +589,24 @@ TEST_F(RaftTest, OneTwoThreeLogLogThreeDamagedLogRestore) {
}
}
-TEST_F(RaftTest, OneTwoLogLogThenThree) {
+TEST_F(RaftTest, OneTwoLogLogThenThree)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
config_log_entry.mutable_config()->add_node("1");
StartUp(2, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
log_entry.set_data("b");
raft.Propose(log_entry);
Ticks(20);
- StartUp(1, config_log_entry); // Start node 2.
+ StartUp(1, config_log_entry); // Start node 2.
config_log_entry.mutable_config()->add_node("2");
- raft.Propose(config_log_entry); // Change config to [0, 1, 2].
+ raft.Propose(config_log_entry); // Change config to [0, 1, 2].
Ticks(20);
EXPECT_EQ(servers_[1]->commits_.size(), 3);
EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
@@ -560,11 +619,12 @@ TEST_F(RaftTest, OneTwoLogLogThenThree) {
EXPECT_EQ(servers_[2]->commits_[2]->config().node_size(), 3);
}
-TEST_F(RaftTest, OneRecover) {
+TEST_F(RaftTest, OneRecover)
+{
LogEntry config_log_entry;
StartUp(1, config_log_entry);
{
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -575,7 +635,8 @@ TEST_F(RaftTest, OneRecover) {
EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
}
-TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader) {
+TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
config_log_entry.mutable_config()->add_node("1");
@@ -583,7 +644,7 @@ TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader) {
StartUp(3, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -603,12 +664,13 @@ TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader) {
}
}
-TEST_F(RaftTest, FiveCrashLeaderAndAnotherAndRecover) {
+TEST_F(RaftTest, FiveCrashLeaderAndAnotherAndRecover)
+{
LogEntry config_log_entry(ConfigLogEntry(5));
StartUp(5, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -630,12 +692,13 @@ TEST_F(RaftTest, FiveCrashLeaderAndAnotherAndRecover) {
EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[1]->data(), "b");
}
-TEST_F(RaftTest, FiveCrashAndBurnLeaderAndAnother) {
+TEST_F(RaftTest, FiveCrashAndBurnLeaderAndAnother)
+{
LogEntry config_log_entry(ConfigLogEntry(5));
StartUp(5, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -656,7 +719,8 @@ TEST_F(RaftTest, FiveCrashAndBurnLeaderAndAnother) {
// Test that a log from a leader without quorum never is committed and that a
// log with the same index from a leader with quorum is.
-TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
+TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2)
+{
LogEntry config_log_entry(ConfigLogEntry(5));
StartUp(5, config_log_entry);
Ticks(20);
@@ -664,7 +728,7 @@ TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
down_.insert(to_string((ileader + 1) % 5));
down_.insert(to_string((ileader + 2) % 5));
down_.insert(to_string((ileader + 3) % 5));
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -676,7 +740,7 @@ TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
down_.insert(to_string(ileader));
Ticks(20);
int ileader2 = servers_[((ileader + 1) % 5)]->leader_[0] - '0';
- auto& raft2 = *servers_[ileader2]->raft_.get();
+ auto &raft2 = *servers_[ileader2]->raft_.get();
log_entry.set_data("c");
raft2.Propose(log_entry);
log_entry.set_data("d");
@@ -692,13 +756,14 @@ TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
}
}
-TEST_F(RaftTest, ReplicaFailover) {
+TEST_F(RaftTest, ReplicaFailover)
+{
LogEntry config_log_entry;
config_log_entry.mutable_config()->add_node("0");
config_log_entry.mutable_config()->add_replica("1");
StartUp(2, config_log_entry);
Ticks(20);
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a");
raft.Propose(log_entry);
@@ -731,10 +796,11 @@ TEST_F(RaftTest, ReplicaFailover) {
EXPECT_EQ(servers_[1]->leader_, "1");
}
-TEST_F(RaftTest, OneSnapshotTwo) {
+TEST_F(RaftTest, OneSnapshotTwo)
+{
LogEntry config_log_entry;
StartUp(1, config_log_entry);
- auto& raft = *servers_[0]->raft_.get();
+ auto &raft = *servers_[0]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a=1");
raft.Propose(log_entry);
@@ -755,12 +821,13 @@ TEST_F(RaftTest, OneSnapshotTwo) {
EXPECT_EQ(servers_[0]->state_["b"].second, "3");
}
-TEST_F(RaftTest, OneTwoThreeSnapshotOneTwoCrashAndBurnThree) {
+TEST_F(RaftTest, OneTwoThreeSnapshotOneTwoCrashAndBurnThree)
+{
LogEntry config_log_entry(ConfigLogEntry(3));
StartUp(3, config_log_entry);
Ticks(20);
int ileader = servers_[0]->leader_[0] - '0';
- auto& raft = *servers_[ileader]->raft_.get();
+ auto &raft = *servers_[ileader]->raft_.get();
LogEntry log_entry;
log_entry.set_data("a=1");
raft.Propose(log_entry);
@@ -780,4 +847,4 @@ TEST_F(RaftTest, OneTwoThreeSnapshotOneTwoCrashAndBurnThree) {
EXPECT_EQ(servers_[2]->state_["a"].second, "1");
EXPECT_EQ(servers_[2]->state_["b"].second, "3");
}
-} // namespace raft
+} // namespace raft