You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2015/07/21 23:54:33 UTC
trafficserver git commit: TS-3786: Implementation of the RAFT
consensus protocol.
Repository: trafficserver
Updated Branches:
refs/heads/master 3184b5521 -> 38e1d741f
TS-3786: Implementation of the RAFT consensus protocol.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/38e1d741
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/38e1d741
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/38e1d741
Branch: refs/heads/master
Commit: 38e1d741f2056a7893a0a90452d37a29ae9bf707
Parents: 3184b55
Author: John Plevyak <jp...@google.com>
Authored: Tue Jul 21 10:16:48 2015 -0700
Committer: John Plevyak <jp...@google.com>
Committed: Tue Jul 21 14:53:26 2015 -0700
----------------------------------------------------------------------
lib/raft/raft.h | 129 ++++++++
lib/raft/raft.proto | 92 ++++++
lib/raft/raft_impl.h | 530 ++++++++++++++++++++++++++++++
lib/raft/raft_test.cc | 783 ++++++++++++++++++++++++++++++++++++++++++++
lib/raft/test_makefile | 140 ++++++++
5 files changed, 1674 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft.h b/lib/raft/raft.h
new file mode 100644
index 0000000..f926a11
--- /dev/null
+++ b/lib/raft/raft.h
@@ -0,0 +1,129 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RAFT_H_
+#define RAFT_H_
+// An implementation of the RAFT consensus algorithm:
+// https://ramcloud.stanford.edu/raft.pdf
+//
+// Features:
+// * Leader election
+// * Log replication
+// * Snapshotting
+// * Configuration updates including changing the set of nodes participating.
+// * Resistant to failures (i.e. complete/partial log/message loss).
+//
+// Servers need to implement the functionality in ExampleRaftServer. A
+// single server may have many Raft objects.
+//
+// On startup:
+// MyServer server;
+// Raft<MyServer>* raft(NewRaft(&server));
+// Initialize the log with the initial config if this is the first run:
+// create an empty log entry, set the initial config and write it.
+// for (log_entry : log)
+// Recover(log_entry);
+// expect CommitLogEntry calls.
+// raft.Start(now, random_string_to_initialize_random_number_generator);
+// expect ConfigChange() and LeaderChange()
+//
+// Main loop (executed by the user code till done):
+// Call Tick(now) initially and periodically, e.g. every 25 msecs.
+// now is monontically increasing time in seconds (double)
+// On a message from a node, call Run(message)
+// expect SendMessage(), GetLogEntry(), WriteLogEntry(),
+// CommitLogEntry(), LeaderChange() and ConfigChange() calls.
+// Note: WriteLogEntry() is blocking, so if you are using Stubby, shift
+// to another (non-Stubby) thread before calling raft.Run()
+// On periodic snapshot, compress the log and call GetSnapshot() to
+// get entries with the raft meta data (and optionally uncommitted
+// entries) which should appear at the end of any compressed log/snapshot.
+// If CommitLogEntry() is idempotent the snapshot can be taken incrementally
+// and a conservative log tail can be retained.
+// When done call Stop()
+// expect SendMessage() etc. calls.
+// delete the Raft object.
+//
+// On master:
+// Periodically call Propose() with a new log entry. This may include data
+// or config, neither or both.
+//
+// Changing the nodes participating:
+// Configuration changes involving changes in the number of nodes require
+// raft from a majority of both the new and old configurations. Until
+// the configuration change has been accepted, the old quorum can continue
+// to commit log entries, resulting in the config_commmit falling behind
+// the data_commit. Once both quorums have accepted the new configuration,
+// the next commit will require both quorums and will update both the
+// data_commit and config_commit at which point ConfigChange() will be called
+// and the new configuration will be live.
+//
+// This class is thread-unsafe (wrap it with a lock) and not reentrant.
+#include <string>
+#include <vector>
+
+namespace raft {
+template <typename Server>
+class Raft {
+ public:
+ typedef typename Server::Message Message;
+ typedef typename Server::LogEntry LogEntry;
+
+ virtual ~Raft() {}
+
+ virtual void SetElectionTimeout(double seconds) = 0; // 1 sec.
+
+ virtual void Recover(const LogEntry& entry) = 0;
+ virtual void Start(double now, int64_t random_seed) = 0;
+ virtual void Tick(double now) = 0; // Call every ~election_timeout/10.
+ virtual void Propose(const LogEntry& entry) = 0;
+ virtual void Run(double now, const Message& message) = 0;
+ virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) = 0;
+ virtual void Stop() = 0; // Clean shutdown for faster failover.
+};
+// The server argument is not owned by the Raft.
+template <typename Server>
+Raft<Server>* NewRaft(Server* server, const ::std::string &node);
+
+// The Server template argument of Raft must conform to this interface.
+class RaftServerInterface {
+ public:
+ typedef Raft<RaftServerInterface> RaftClass;
+ class Config; // See RaftConfigPb in raft.proto.
+ class LogEntry; // See RaftLogEntryPb in raft.proto.
+ class Message; // See RaftMessagePb in raft.proto.
+
+ // Since a single server may handle multiple raft objects, the
+ // RaftClass argument is provided to differentiate the caller.
+
+ // Send a raft message to the given node.
+ // Returns: true if accepted for delivery.
+ bool SendMessage(RaftClass* raft, const ::std::string& node,
+ const Message& message);
+ // Get a LogEntry to update a node from after (term, index) up to end. These
+ // could be the actual written log entry or for committed log entries one
+ // which summarizes the changes.
+ void GetLogEntry(RaftClass* raft, int64_t term, int64_t index,
+ int64_t end, LogEntry* entry);
+ // Write a log entry, returning when it has been persisted.
+ void WriteLogEntry(RaftClass* raft, const LogEntry& entry);
+ // Commit a log entry, updating the server state.
+ void CommitLogEntry(RaftClass* raft, const LogEntry& entry);
+ // The leader has changed. If leader.empty() there is no leader.
+ void LeaderChange(RaftClass* raft, const ::std::string& leader);
+ // The configuration has changed.
+ void ConfigChange(RaftClass* raft, const Config& config);
+};
+} // namespace raft
+#endif // RAFT_H_
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft.proto
----------------------------------------------------------------------
diff --git a/lib/raft/raft.proto b/lib/raft/raft.proto
new file mode 100644
index 0000000..8b35043
--- /dev/null
+++ b/lib/raft/raft.proto
@@ -0,0 +1,92 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+
+package raft;
+
+// This file describes the requirements the consensus algorithm.
+
+// For Raft<Server>, Server::LogEntry must contain these fields.
+message RaftConfigPb {
+ repeated string node = 1; // Nodes participating as the consensus.
+ repeated string replica = 2; // Replicas not participating in consensus.
+ // Internal fields: set by the consensus algorithm.
+ optional int64 term = 3;
+ optional int64 index = 4;
+
+ extensions 100 to max;
+}
+
+// For Raft<Server>, Server::LogEntry must contain these fields.
+//
+// PUBLIC
+// The only fields which should be set by the user are:
+// 'data' which can be used to store any user data. It is otherwise unused.
+// 'config' which can be set to propose a configuration change.
+// 'index' should be set in response to GetLogEntry() either because the
+// entry has come from the log (where it was set by Raft) or for
+// summaries, by the user.
+// 'extent' can be set for Propose() and summaries from GetLogEntry() to
+// indicate that the log covers a range of indexes.
+//
+// PRIVATE
+// There are 3 types of consensus log entries:
+// User: when !has_term(), the entry has come from the user:
+// * the initial configuration which was prepended to the log manually.
+// * summaries in a compressed log or from GetLogEntry().
+// Local: when !has_index(), the entry stores only internal state:
+// * the leader, vote and data_committed, config_committed fields.
+// Consensus: replicatable log entries which have been through consesus.
+// * User log entries are converted to Consensus log entries on ingest.
+message RaftLogEntryPb {
+ // External fields: set by the user.
+ optional bytes data = 1; // Available for user data.
+ optional RaftConfigPb config = 2;
+ optional int64 index = 3; // Monotonic log entry index.
+ optional int64 extent = 4; // Indexes covered are [index, index + extent].
+ // External/Internal fields: set by consensus, readable by user.
+ optional int64 term = 5; // When !has_term() directly from user.
+ // Internal fields: set by the consensus algorithm, purely internal.
+ optional int64 previous_log_term = 6;
+ optional int64 previous_log_index = 7;
+ // Local fields: set by consensus, only persisted on the local node.
+ optional string leader = 8;
+ optional int64 data_committed = 9; // Index of committed data.
+ optional int64 config_committed = 10; // Index of committed config.
+ optional string vote = 11; // Vote in the term.
+
+ extensions 100 to max;
+}
+
+// For Raft<Server>, Server::Message must contain these fields.
+// PRIVATE
+message RaftMessagePb {
+ optional int64 term = 1;
+ optional string from = 2; // Node this message is from.
+ optional string leader = 3;
+ optional int64 data_committed = 4;
+ optional int64 config_committed = 5;
+ optional RaftLogEntryPb entry = 8;
+ // Acknowledgement.
+ optional bool nack = 9; // Reset the stream to last_log_term/index.
+ // Voting and acknowledgement.
+ optional int64 last_log_term = 10;
+ optional int64 last_log_index = 11;
+ // Voting.
+ optional string vote = 12;
+
+ extensions 100 to max;
+}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft_impl.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft_impl.h b/lib/raft/raft_impl.h
new file mode 100644
index 0000000..10248b3
--- /dev/null
+++ b/lib/raft/raft_impl.h
@@ -0,0 +1,530 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef CONSENSUS_IMPL_H_
+#define CONSENSUS_IMPL_H_
+#include <stdlib.h>
+#include <algorithm>
+#include <deque>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "raft.h"
+
+namespace raft {
+
+template <typename Server>
+class RaftImpl : public Raft<Server> {
+ public:
+ typedef typename Server::Message Message;
+ typedef typename Server::LogEntry LogEntry;
+ typedef typename Server::Config Config;
+
+ RaftImpl(Server* server, const ::std::string &node)
+ : server_(server), node_(node) {}
+ ~RaftImpl() {}
+
+ virtual void SetElectionTimeout(double timeout) {
+ election_timeout_ = timeout;
+ }
+
+ virtual void Recover(const LogEntry& e) {
+ if (!e.has_term()) { // LogEntry from server.
+ if (e.has_index()) { // Summary log entry.
+ ProcessLogEntry(e, true);
+ Commit(true);
+ } else if (e.has_config()) {
+ config_.CopyFrom(e.config());
+ ConfigChanged();
+ }
+ } else { // LogEntry has passed through Raft.
+ if (e.term() > term_) NewTerm(e.term(), e.leader(), true);
+ if (e.has_config_committed()) config_committed_ = e.config_committed();
+ if (e.has_data_committed()) data_committed_ = e.data_committed();
+ ProcessLogEntry(e, true);
+ Commit(true);
+ }
+ }
+
+ virtual void Start(double now, int64_t seed) {
+ last_heartbeat_ = now;
+ srand48_r(seed, &rand_);
+ double r = 0.0;
+ drand48_r(&rand_, &r);
+ random_election_delay_ = election_timeout_ * r;
+ if (ConfigChanged())
+ NewTerm(term_ + 1, leader_, true);
+ else
+ vote_ = node_; // Conservatively assume we called a vote for ourself.
+ server_->ConfigChange(this, config_);
+ server_->LeaderChange(this, leader_);
+ }
+
+ virtual void Tick(double now) {
+ if (i_am_in_nodes() && !other_nodes_.empty() &&
+ now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
+ double r = 0.0;
+ drand48_r(&rand_, &r);
+ random_election_delay_ = election_timeout_ * r;
+ last_heartbeat_ = now;
+ VoteForMe();
+ return;
+ }
+ // Send heartbeats at 1/4 of timeout to allow for lost packets/connections.
+ if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
+ last_heartbeat_sent_ = now;
+ ReplicateAll(true);
+ }
+ }
+
+ virtual void Propose(const LogEntry& e) {
+ assert(i_am_leader());
+ LogEntry entry(e);
+ entry.set_term(term_);
+ entry.set_index(index_ + 1);
+ entry.set_previous_log_term(last_log_term_);
+ entry.set_previous_log_index(index_);
+ ProcessLogEntry(entry, false);
+ ReplicateAll(false);
+ Commit(false);
+ }
+
+ virtual void Run(double now, const Message& m) {
+ if (m.term() >= term_) seen_term_ = true;
+ if (m.term() < term_) return; // Ignore messages from terms gone by.
+ if (m.term() > term_) NewTerm(m.term(), m.leader(), false);
+ if (m.leader() != "" && leader_ != m.leader() &&
+ other_nodes_.count(m.from())) { // Only from nodes I acknowledge.
+ leader_ = m.leader();
+ server_->LeaderChange(this, leader_);
+ }
+ auto& n = node_state_[m.from()];
+ if (n.term != m.term()) {
+ n.term = m.term();
+ n.vote = "";
+ }
+ n.term = term_;
+ n.last_log_term = m.last_log_term();
+ n.last_log_index = m.last_log_index();
+ if (m.from() != leader_ || m.has_vote()) {
+ HandleAck(now, m, &n);
+ if (m.has_vote()) HandleVote(m, &n);
+ return;
+ }
+ last_heartbeat_ = now;
+ if (m.config_committed() > config_committed_ ||
+ m.data_committed() > data_committed_) {
+ config_committed_ = m.config_committed();
+ data_committed_ = m.data_committed();
+ WriteInternalLogEntry();
+ }
+ if (m.has_entry())
+ Ack(ProcessLogEntry(m.entry(), false));
+ else
+ Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
+ Commit(false);
+ }
+
+ virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) {
+ entries->clear();
+ LogEntry config_e;
+ config_e.set_term(config_.term());
+ config_e.set_index(config_.index());
+ config_e.set_vote(vote_);
+ config_e.set_data_committed(data_committed_);
+ config_e.set_config_committed(config_committed_);
+ config_e.mutable_config()->CopyFrom(config_);
+ entries->push_back(config_e);
+ if (pending_config_.has_term() &&
+ (!waiting_commits_.size() || // If it isn't in the waiting_commits.
+ waiting_commits_.front()->index() > pending_config_.index())) {
+ LogEntry pending_e;
+ pending_e.set_term(pending_config_.term());
+ pending_e.set_index(pending_config_.index());
+ pending_e.mutable_config()->CopyFrom(pending_config_);
+ entries->push_back(pending_e);
+ }
+ if (uncommitted)
+ for (auto& e : waiting_commits_) entries->push_back(*e);
+ }
+
+ virtual void Stop() { Abdicate(); }
+
+ private:
+ struct NodeState {
+ int64_t term = -1;
+ int64_t sent_term = 0;
+ int64_t sent_index = 0;
+ int64_t last_log_term = -1;
+ int64_t last_log_index = -1;
+ double ack_received = -1.0e10;
+ ::std::string vote;
+ };
+
+ Message InitializeMessage() {
+ Message m;
+ m.set_term(term_);
+ m.set_last_log_term(last_log_term_);
+ m.set_last_log_index(index_);
+ m.set_from(node_);
+ m.set_leader(leader_);
+ m.set_data_committed(data_committed_);
+ m.set_config_committed(config_committed_);
+ return m;
+ }
+
+ void NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery) {
+ vote_ = "";
+ term_ = term;
+ leader_ = new_leader;
+ waiting_commits_.clear();
+ if (!in_recovery) {
+ WriteInternalLogEntry();
+ server_->LeaderChange(this, leader_);
+ }
+ }
+
+ void VoteForMe() {
+ if (seen_term_ || leader_ != "" || vote_ != node_) {
+ vote_ = node_;
+ term_++;
+ leader_ = "";
+ waiting_commits_.clear();
+ WriteInternalLogEntry();
+ server_->LeaderChange(this, leader_);
+ seen_term_ = false;
+ }
+ Vote();
+ }
+
+ void Vote() {
+ Message m(InitializeMessage());
+ m.set_vote(vote_);
+ if (vote_ == node_)
+ SendToReplicas(m);
+ else
+ server_->SendMessage(this, vote_, m);
+ }
+
+ void HandleVote(const Message& m, NodeState* n) {
+ n->vote = m.vote();
+ if (vote_.empty()) { // I have not voted yet.
+ if (m.vote() == node_) { // Abdication.
+ VoteForMe();
+ } else if (m.last_log_term() >= last_log_term_ &&
+ m.last_log_index() >= index_) {
+ // Vote for candidate if it is at least as up to date as we are.
+ vote_ = m.vote();
+ WriteInternalLogEntry();
+ Vote();
+ }
+ } else if (vote_ == node_ && node_ == n->vote) {
+ int votes = 0;
+ for (auto& o : other_config_nodes_) {
+ auto& s = node_state_[o];
+ if (s.term == term_ && s.vote == node_) votes++;
+ }
+ if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
+ leader_ = node_;
+ WriteInternalLogEntry();
+ server_->LeaderChange(this, leader_);
+ HeartBeat(); // Inform the others.
+ }
+ }
+ }
+
+ void Ack(bool ack) {
+ Message m(InitializeMessage());
+ if (!ack) { // Reset local log state to last committed.
+ m.set_nack(true);
+ m.set_last_log_term(last_log_committed_term_);
+ m.set_last_log_index(last_log_committed_index_);
+ index_ = last_log_committed_index_;
+ last_log_term_ = last_log_committed_term_;
+ }
+ server_->SendMessage(this, leader_, m);
+ }
+
+ void HandleAck(double now, const Message& m, NodeState* n) {
+ n->ack_received = now;
+ if (m.nack()) {
+ n->sent_index = n->last_log_index;
+ n->sent_term = n->last_log_term;
+ } else if (i_am_leader()) {
+ int acks_needed = (other_nodes_.size() + 1) / 2;
+ for (auto& o : other_nodes_)
+ if (node_state_[o].ack_received >= last_heartbeat_sent_) acks_needed--;
+ if (acks_needed <= 0) last_heartbeat_ = now;
+ UpdateCommitted();
+ }
+ }
+
+ void HeartBeat() {
+ Message m(InitializeMessage());
+ SendToReplicas(m);
+ }
+
+ void SendToReplicas(const Message& m) {
+ for (auto& n : replicas_) server_->SendMessage(this, n, m);
+ }
+
+ void Abdicate() {
+ if (!i_am_leader()) return;
+ // Attempt to pass leadership to a worthy successor.
+ const ::std::string* best_node = nullptr;
+ NodeState* best = nullptr;
+ for (auto& n : other_nodes_) {
+ auto& s = node_state_[n];
+ if (!best || (s.last_log_term > best->last_log_term ||
+ (s.last_log_term == best->last_log_term &&
+ s.last_log_index > best->last_log_index))) {
+ best_node = &n;
+ best = &s;
+ }
+ }
+ if (best_node) {
+ term_++;
+ leader_ = "";
+ vote_ = *best_node;
+ WriteInternalLogEntry();
+ Message m(InitializeMessage());
+ m.set_vote(vote_);
+ server_->SendMessage(this, vote_, m);
+ }
+ }
+
+ void WriteInternalLogEntry() {
+ LogEntry e;
+ e.set_term(term_);
+ e.set_leader(leader_);
+ e.set_vote(vote_);
+ e.set_data_committed(data_committed_);
+ e.set_config_committed(config_committed_);
+ server_->WriteLogEntry(this, e);
+ }
+
+ bool ProcessLogEntry(const LogEntry& e, bool in_recovery) {
+ if (e.has_config()) {
+ pending_config_.CopyFrom(e.config());
+ pending_config_.set_term(e.term());
+ pending_config_.set_index(e.index());
+ ConfigChanged();
+ }
+ if (e.has_index()) { // Not an internal entry.
+ std::unique_ptr<LogEntry> entry(new LogEntry(e));
+ if (e.index() <= index_) return true; // Already seen this.
+ if (!entry->has_term()) { // Summary, fill in the internal bits.
+ entry->set_term(term_);
+ index_ = entry->index() - 1; // Summary need not have an extent().
+ entry->set_previous_log_term(last_log_term_);
+ entry->set_previous_log_index(index_);
+ }
+ if (e.term() < last_log_term_) return true; // Already seen this.
+ if (e.term() == last_log_term_ && e.index() <= index_) return true;
+ if ((entry->previous_log_term() != last_log_term_ ||
+ entry->previous_log_index() != index_))
+ return false; // Out of sequence.
+ if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
+ return false; // Out of sequence.
+ last_log_term_ = entry->term();
+ index_ = entry->index() + entry->extent();
+ if (!in_recovery && i_am_leader()) {
+ if (!other_nodes_.size()) data_committed_ = index_;
+ if (!other_config_nodes_.size()) config_committed_ = index_;
+ }
+ entry->set_data_committed(data_committed_);
+ entry->set_config_committed(config_committed_);
+ if (!in_recovery) server_->WriteLogEntry(this, *entry);
+ waiting_commits_.emplace_back(entry.release());
+ }
+ return true;
+ }
+
+ int MajorityIndex(const ::std::set<::std::string>& other) {
+ ::std::vector<int64_t> indices(1, index_);
+ for (auto& o : other) indices.push_back(node_state_[o].last_log_index);
+ sort(indices.begin(), indices.end());
+ return indices[indices.size() / 2];
+ }
+
+ void UpdateCommitted() {
+ int i = MajorityIndex(other_nodes_);
+ if (i > data_committed_) {
+ data_committed_ = i;
+ WriteInternalLogEntry();
+ Commit(false);
+ HeartBeat();
+ }
+ if (pending_config_.has_term()) { // If a pending configuration change.
+ int ci = MajorityIndex(other_config_nodes_);
+ // config_committed must be <= data_committed, so the new
+ // configuration must also concur with the new data_committed.
+ if (i == ci && ci > config_committed_) {
+ config_committed_ = ci;
+ WriteInternalLogEntry();
+ Commit(false);
+ HeartBeat();
+ if (!i_am_leader() && other_nodes_.size() > 1) Abdicate();
+ }
+ }
+ }
+
+ void Commit(bool in_recovery) {
+ ::std::vector<std::unique_ptr<LogEntry>> pending;
+ while (!waiting_commits_.empty() &&
+ waiting_commits_.front()->index() <= data_committed_) {
+ auto& e = waiting_commits_.front();
+ while (!pending.empty() && e->index() <= pending.back()->index())
+ pending.pop_back();
+ pending.emplace_back(e.release());
+ waiting_commits_.pop_front();
+ }
+ for (auto& e : pending) {
+ server_->CommitLogEntry(this, *e);
+ last_log_committed_term_ = e->term();
+ last_log_committed_index_ = e->index();
+ }
+ CommitConfig(in_recovery);
+ }
+
+ void CommitConfig(bool in_recovery) {
+ if (pending_config_.has_term() && pending_config_.term() == term_ &&
+ pending_config_.index() <= config_committed_) {
+ config_.Swap(&pending_config_);
+ pending_config_.Clear();
+ server_->ConfigChange(this, config_);
+ if (ConfigChanged()) {
+ NewTerm(term_ + 1, leader_, in_recovery);
+ if (!in_recovery) HeartBeat();
+ }
+ }
+ }
+
+ bool ConfigChanged() { // Returns: true if the leader_ changed.
+ other_nodes_.clear();
+ other_config_nodes_.clear();
+ replicas_.clear();
+ for (auto& n : config_.node())
+ if (n != node_) {
+ other_nodes_.insert(n);
+ other_config_nodes_.insert(n);
+ }
+ for (auto& n : pending_config_.node())
+ if (n != node_) other_config_nodes_.insert(n);
+ replicas_.insert(config_.replica().begin(), config_.replica().end());
+ replicas_.insert(pending_config_.replica().begin(),
+ pending_config_.replica().end());
+ replicas_.insert(other_nodes_.begin(), other_nodes_.end());
+ replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
+ ::std::string old_leader = leader_;
+ if (!other_nodes_.size())
+ leader_ = node_;
+ else if (!i_am_in_nodes() && other_nodes_.size() == 1)
+ leader_ = *other_nodes_.begin();
+ else if (leader_ == node_ && !i_am_in_nodes())
+ leader_ = "";
+ return leader_ != old_leader;
+ }
+
+ bool SendReplicationMessage(const ::std::string& n, const LogEntry& entry,
+ NodeState* s) {
+ Message m(InitializeMessage());
+ m.mutable_entry()->CopyFrom(entry);
+ if (!server_->SendMessage(this, n, m)) return false;
+ s->sent_index = entry.index() + entry.extent();
+ s->sent_term = entry.term();
+ return true;
+ }
+
+ void Replicate(const ::std::string& n, bool heartbeat) {
+ bool sent = false;
+ auto& s = node_state_[n];
+ if (s.term == term_) { // Replica has acknowledged me as leader.
+ int64_t end = index_;
+ if (waiting_commits_.size()) end = waiting_commits_.front()->index() - 1;
+ while (s.sent_index < end) { // Get from server.
+ LogEntry entry;
+ server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
+ if (!entry.has_term()) {
+ // A summary log entry from the server with historical information.
+ entry.set_term(last_log_term_);
+ entry.set_index(s.sent_index + 1);
+ }
+ entry.set_previous_log_term(s.sent_term);
+ entry.set_previous_log_index(s.sent_index);
+ assert(entry.index() > s.sent_index);
+ int64_t x = s.sent_index;
+ if (!SendReplicationMessage(n, entry, &s)) break;
+ assert(s.sent_index > x);
+ sent = true;
+ }
+ for (auto& e : waiting_commits_) {
+ if (e->index() <= s.sent_index) // Skip those already sent.
+ continue;
+ if (!SendReplicationMessage(n, *e, &s)) break;
+ sent = true;
+ }
+ }
+ if (heartbeat && !sent) {
+ Message m(InitializeMessage());
+ server_->SendMessage(this, n, m);
+ }
+ }
+
+ void ReplicateAll(bool heartbeat) {
+ for (auto& n : replicas_) Replicate(n, heartbeat);
+ }
+
+ bool i_am_leader() { return node_ == leader_; }
+ bool i_am_in_nodes() {
+ auto& n = config_.node();
+ return std::find(n.begin(), n.end(), node_) != n.end();
+ }
+
+ Server* const server_;
+ struct drand48_data rand_;
+ const ::std::string node_;
+ int64_t term_ = 0; // Current term.
+ int64_t last_log_term_ = -1; // Term of last log entry this node has.
+ int64_t index_ = 0; // Index of last log entry this node has.
+ int64_t config_committed_ = -1;
+ int64_t data_committed_ = -1;
+ int64_t last_log_committed_index_ = -1;
+ int64_t last_log_committed_term_ = -1;
+ double election_timeout_ = 1.0;
+ double last_heartbeat_ = -1.0e10;
+ double last_heartbeat_sent_ = -1.0e10;
+ double random_election_delay_ = 0.0;
+ ::std::string leader_; // The current leader. "" if there is no leader.
+ ::std::string vote_; // My vote this term.
+ Config config_;
+ Config pending_config_;
+ ::std::map<::std::string, NodeState> node_state_;
+ ::std::deque<std::unique_ptr<LogEntry>> waiting_commits_;
+ bool seen_term_ = true;
+ // Cached values.
+ ::std::set<::std::string> other_nodes_; // Nodes required for consensus on log entries.
+ ::std::set<::std::string> other_config_nodes_; // Nodes required for config changes.
+ ::std::set<::std::string> replicas_; // All nodes receiving the replication stream.
+};
+
+template <typename Server>
+Raft<Server>* NewRaft(Server* server, const ::std::string &node) {
+ return new RaftImpl<Server>(server, node);
+}
+} // namespace raft
+#endif // CONSENSUS_IMPL_H_
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft_test.cc
----------------------------------------------------------------------
diff --git a/lib/raft/raft_test.cc b/lib/raft/raft_test.cc
new file mode 100644
index 0000000..b52ef11
--- /dev/null
+++ b/lib/raft/raft_test.cc
@@ -0,0 +1,783 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "raft.h"
+
+#include <algorithm>
+#include <deque>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "raft.pb.h"
+#include "raft_impl.h"
+#include "gtest/gtest.h"
+
+using ::std::deque;
+using ::std::map;
+using ::std::set;
+using ::std::string;
+using ::std::unique_ptr;
+using ::std::vector;
+using ::std::pair;
+using ::std::to_string;
+
+const int kMaxServers = 10;
+
+namespace raft {
+
+class RaftTest;
+
+class RaftServer : public RaftServerInterface {
+ public:
+ typedef RaftMessagePb Message;
+ typedef RaftLogEntryPb LogEntry;
+ typedef RaftConfigPb Config;
+ typedef Raft<RaftServer> RaftClass;
+
+ RaftServer(const string node, RaftTest* test) {
+ node_ = node;
+ test_ = test;
+ raft_.reset(NewRaft(this, node));
+ }
+
+ bool SendMessage(RaftClass* raft, const string& node,
+ const Message& message);
+
+ void GetLogEntry(RaftClass* raft, int64_t term, int64_t start,
+ int64_t end, LogEntry* entry) {
+ if (use_commit_log_) {
+ for (auto& e : commits_) {
+ if (e->term() < term) continue;
+ if (e->index() >= start) {
+ entry->CopyFrom(*e);
+ return;
+ }
+ }
+ } else {
+ for (auto& e : log_) {
+ if (e->term() < term) continue;
+ if (e->has_index() && e->index() >= start) {
+ entry->CopyFrom(*e);
+ return;
+ }
+ }
+ }
+ entry->Clear();
+ }
+
+ void WriteLogEntry(RaftClass* raft, const LogEntry& entry) {
+ log_.emplace_back(new LogEntry(entry));
+ }
+
+ void CommitLogEntry(RaftClass* raft, const LogEntry& entry) {
+ commits_.emplace_back(new LogEntry(entry));
+ string s = entry.data();
+ auto p = s.find("=");
+ if (p != string::npos)
+ state_[s.substr(0, p)] = make_pair(entry.index(), s.substr(p + 1));
+ }
+
+ void LeaderChange(RaftClass* raft, const string& leader) {
+ leader_ = leader;
+ }
+
+ void ConfigChange(RaftClass* raft, const Config& config) {
+ config_.reset(new Config(config));
+ }
+
+ bool use_commit_log_ = false;
+ RaftTest* test_;
+ unique_ptr<Config> config_;
+ string node_;
+ string leader_;
+ unique_ptr<RaftClass> raft_;
+ vector<unique_ptr<LogEntry>> log_;
+ vector<unique_ptr<LogEntry>> commits_;
+ map<string, pair<int64_t, string>> state_;
+};
+
+template <typename T>
+bool firstless(const T& a, const T& b) {
+ return a.first < b.first;
+}
+
+class RaftTest : public ::testing::Test {
+ public:
+ typedef RaftServer::Message Message;
+ typedef RaftServer::LogEntry LogEntry;
+ typedef RaftServer::Config Config;
+
+ void SendMessage(const string& from, const string& to,
+ const Message& message) {
+ if (down_.count(from) || down_.count(to)) return;
+ messages_.emplace_back(make_pair(to, new Message(message)));
+ }
+
+ void ForwardMessages() {
+ while (!messages_.empty()) {
+ auto& p = messages_.front();
+ for (auto& s : servers_)
+ if (p.first == s->node_) s->raft_->Run(now_, *p.second);
+ delete p.second;
+ messages_.pop_front();
+ }
+ }
+
+ protected:
+ RaftTest() : now_(0) {}
+ virtual ~RaftTest() {
+ for (auto& p : messages_) delete p.second;
+ }
+
+ // 20 ticks gives 2 full election timeouts because the timeouts are random
+ // on any given node and very from [1, 2) timeouts.
+ void Ticks(int n) {
+ for (int i = 0; i < n; i++) {
+ now_ += 0.1;
+ for (auto& s : servers_) {
+ s->raft_->Tick(now_);
+ ForwardMessages();
+ }
+ }
+ }
+
+ void StartUp(int n, const LogEntry& config_log_entry) {
+ int offset = servers_.size();
+ for (int i = offset; i < n + offset; i++) {
+ servers_.emplace_back(new RaftServer(to_string(i), this));
+ auto& raft = *servers_[i]->raft_.get();
+ raft.Recover(config_log_entry);
+ raft.Start(0, i);
+ }
+ }
+
+ void CrashAndRecover(int i, const LogEntry& config_log_entry) {
+ vector<unique_ptr<LogEntry>> log;
+ for (auto& p : servers_[i]->log_) log.emplace_back(p.release());
+ servers_[i].reset(new RaftServer(to_string(i), this));
+ auto& raft = *servers_[i]->raft_.get();
+ raft.Recover(config_log_entry);
+ for (auto& p : log) {
+ raft.Recover(*p);
+ servers_[i]->log_.emplace_back(p.release());
+ }
+ raft.Start(now_, i);
+ }
+
+ void CrashAndBurn(int i, const LogEntry& config_log_entry) {
+ servers_[i].reset(new RaftServer(to_string(i), this));
+ auto& raft = *servers_[i]->raft_.get();
+ raft.Recover(config_log_entry);
+ raft.Start(now_, i);
+ }
+
+ void SnapshotCrashAndRecover(int i, const LogEntry& config_log_entry) {
+ vector<LogEntry> entries;
+ vector<pair<int64_t, string>> state;
+ for (auto& p : servers_[i]->state_)
+ state.push_back(
+ make_pair(p.second.first, p.first + "=" + p.second.second));
+ std::sort(state.begin(), state.end(), firstless<pair<int64_t, string>>);
+ servers_[i]->log_.clear();
+ for (auto& s : state) {
+ LogEntry* e = new LogEntry;
+ e->set_index(s.first);
+ e->set_data(s.second);
+ servers_[i]->log_.emplace_back(e);
+ }
+ auto& raft = *servers_[i]->raft_.get();
+ raft.Snapshot(false, &entries);
+ for (auto& e : entries) servers_[i]->log_.emplace_back(new LogEntry(e));
+ servers_[i]->state_.clear();
+ CrashAndRecover(i, config_log_entry);
+ }
+
+ LogEntry ConfigLogEntry(int n) {
+ LogEntry config_log_entry;
+ for (int i = 0; i < n; i++)
+ config_log_entry.mutable_config()->add_node(to_string(i));
+ return config_log_entry;
+ }
+
+ double now_;
+ set<string> down_;
+ vector<unique_ptr<RaftServer>> servers_;
+ deque<pair<string, Message*>> messages_;
+};
+
+bool RaftServer::SendMessage(RaftClass* raft, const string& node,
+ const Message& message) {
+ test_->SendMessage(node_, node, message);
+ return true;
+}
+
+TEST_F(RaftTest, OneEmptyConfig) {
+ servers_.emplace_back(new RaftServer("0", this));
+ auto& raft = *servers_[0]->raft_.get();
+ raft.Start(0, 0);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->leader_, "0");
+}
+
+TEST_F(RaftTest, One) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->leader_, "0");
+}
+
+TEST_F(RaftTest, OneTwoNotParticipating) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ // Startup server 0 as leader.
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ // Startup server 1 with config with 0 as leader.
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->leader_, "0");
+ EXPECT_EQ(servers_[1]->leader_, "0");
+}
+
+TEST_F(RaftTest, OneTwo) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ // Startup server 0 as leader.
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ // Startup server 1 with config with 0 as leader.
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ // Add 1 into consensus.
+ {
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ config_log_entry.mutable_config()->add_node("1");
+ raft.Propose(config_log_entry);
+ Ticks(20);
+ }
+ EXPECT_EQ(servers_[0]->leader_, "0");
+ EXPECT_EQ(servers_[1]->leader_, "0");
+ EXPECT_EQ(servers_[0]->commits_.size(), 1);
+ EXPECT_EQ(servers_[1]->commits_.size(), 1);
+}
+
+TEST_F(RaftTest, OneTwoSwitchToTwo) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ // Startup servers 0, and 1 with 0 as leader.
+ StartUp(1, config_log_entry);
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ // Add 1 into consensus.
+ {
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry config_log_entry(ConfigLogEntry(2));
+ raft.Propose(config_log_entry);
+ Ticks(20);
+ }
+ EXPECT_EQ(servers_[0]->leader_, "0");
+ EXPECT_EQ(servers_[1]->leader_, "0");
+ // Switch to only having 1.
+ {
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("1");
+ config_log_entry.mutable_config()->add_replica("0");
+ raft.Propose(config_log_entry);
+ Ticks(20);
+ }
+ EXPECT_EQ(servers_[0]->leader_, "1");
+ EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneThenTwo) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ // Startup servers 0 and 1, with 0 as leader.
+ StartUp(1, config_log_entry);
+ StartUp(1, config_log_entry);
+ Ticks(20);
+ // Switch to only having 1.
+ {
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("1");
+ raft.Propose(config_log_entry);
+ Ticks(20);
+ }
+ EXPECT_EQ(servers_[0]->leader_, "1");
+ EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneAndTwo) {
+ LogEntry config_log_entry(ConfigLogEntry(2));
+ // Startup servers 0, and 1 in nodes.
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ EXPECT_NE(servers_[0]->leader_, "");
+ EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoAndThree) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1 and 2 in nodes.
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ EXPECT_NE(servers_[0]->leader_, "");
+ EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+ EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoNotThree) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1 with config [0, 1, 2].
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ EXPECT_NE(servers_[0]->leader_, "");
+ EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoThenTwoAndThree) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1 with config [0, 1, 2].
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ // Startup server 2 with config [0, 1, 2] and down 0.
+ StartUp(1, config_log_entry);
+ down_.insert("0");
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->leader_, "");
+ EXPECT_NE(servers_[1]->leader_, "");
+ EXPECT_NE(servers_[1]->leader_, "0");
+ EXPECT_EQ(servers_[2]->leader_, servers_[1]->leader_);
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAbdicate) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1, 2 with config [0, 1, 2].
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ raft.Stop();
+ down_.insert(to_string(ileader));
+ Ticks(1); // Abdication will cause immediate reelection.
+ EXPECT_NE(servers_[(ileader + 1) % 3]->leader_, "");
+ EXPECT_EQ(servers_[(ileader + 1) % 3]->leader_,
+ servers_[(ileader + 2) % 3]->leader_);
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAllSeparate) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1, 2 with config [0, 1, 2].
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ down_.insert("0");
+ down_.insert("1");
+ down_.insert("2");
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->leader_, "");
+ EXPECT_EQ(servers_[1]->leader_, "");
+ EXPECT_EQ(servers_[2]->leader_, "");
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAllSeparateThenTogether) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ // Startup servers 0, 1, 2 with config [0, 1, 2].
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ down_.insert("0");
+ down_.insert("1");
+ down_.insert("2");
+ Ticks(20);
+ down_.clear();
+ Ticks(20);
+ EXPECT_NE(servers_[0]->leader_, "");
+ EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+ EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneLog) {
+ LogEntry config_log_entry;
+ StartUp(1, config_log_entry);
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ EXPECT_EQ(servers_[0]->log_.size(), 1);
+ EXPECT_EQ(servers_[0]->log_[0]->data(), "a");
+ EXPECT_EQ(servers_[0]->commits_.size(), 1);
+ EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+}
+
+TEST_F(RaftTest, OneLogLog) {
+ LogEntry config_log_entry;
+ StartUp(1, config_log_entry);
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ EXPECT_EQ(servers_[0]->log_.size(), 2);
+ EXPECT_EQ(servers_[0]->log_[0]->data(), "a");
+ EXPECT_EQ(servers_[0]->log_[1]->data(), "b");
+ EXPECT_EQ(servers_[0]->commits_.size(), 2);
+ EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, OneTwoLogLog) {
+ LogEntry config_log_entry(ConfigLogEntry(2));
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ int iother = ileader ? 0 : 1;
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ Ticks(20);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[ileader]->log_.size(), 7);
+ EXPECT_NE(servers_[ileader]->log_[0]->vote(), ""); // vote.
+ EXPECT_NE(servers_[ileader]->log_[1]->leader(), ""); // election.
+ EXPECT_EQ(servers_[ileader]->log_[2]->data_committed(),
+ servers_[ileader]->log_[1]->index());
+ EXPECT_EQ(servers_[ileader]->log_[3]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->log_[4]->data_committed(),
+ servers_[ileader]->log_[3]->index());
+ EXPECT_EQ(servers_[ileader]->log_[5]->data(), "b");
+ EXPECT_EQ(servers_[ileader]->log_[6]->data_committed(),
+ servers_[ileader]->log_[5]->index());
+ EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+ EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[iother]->commits_.size(), 2);
+ EXPECT_EQ(servers_[iother]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[iother]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, OneTwoThreeLogDownLogUp) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ Ticks(20);
+ int downer = (ileader + 1) % 3;
+ down_.insert(to_string(downer));
+ Ticks(20);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[downer]->commits_.size(), 1);
+ EXPECT_EQ(servers_[downer]->commits_[0]->data(), "a");
+ down_.clear();
+ Ticks(20);
+ Ticks(20);
+ for (auto i : {0, 1, 2}) {
+ EXPECT_EQ(servers_[i]->commits_.size(), 2);
+ EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+ }
+}
+
+TEST_F(RaftTest, OneTwoThreeLogLogThreeDamagedLogRestore) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ Ticks(20);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ int downer = (ileader + 1) % 3;
+ // Lose the "a" commit.
+ servers_[downer]->log_.erase(servers_[downer]->log_.begin() + 3);
+ CrashAndRecover(downer, config_log_entry);
+ Ticks(20);
+ for (auto i : {0, 1, 2}) {
+ EXPECT_EQ(servers_[i]->commits_.size(), 2);
+ EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+ }
+}
+
+TEST_F(RaftTest, OneTwoLogLogThenThree) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ config_log_entry.mutable_config()->add_node("1");
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ StartUp(1, config_log_entry); // Start node 2.
+ config_log_entry.mutable_config()->add_node("2");
+ raft.Propose(config_log_entry); // Change config to [0, 1, 2].
+ Ticks(20);
+ EXPECT_EQ(servers_[1]->commits_.size(), 3);
+ EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[1]->commits_[2]->config().node_size(), 3);
+ // Verify that the log is replicated.
+ EXPECT_EQ(servers_[2]->commits_.size(), 3);
+ EXPECT_EQ(servers_[2]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[2]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[2]->commits_[2]->config().node_size(), 3);
+}
+
+TEST_F(RaftTest, OneRecover) {
+ LogEntry config_log_entry;
+ StartUp(1, config_log_entry);
+ {
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ }
+ Ticks(20);
+ CrashAndRecover(0, config_log_entry);
+ EXPECT_EQ(servers_[0]->commits_.size(), 1);
+ EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+}
+
+TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ config_log_entry.mutable_config()->add_node("1");
+ config_log_entry.mutable_config()->add_node("2");
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+ EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+ CrashAndBurn(ileader, config_log_entry);
+ Ticks(20);
+ // Verify that the log is replicated.
+ for (auto i : {0, 1, 2}) {
+ EXPECT_EQ(servers_[i]->commits_.size(), 2);
+ EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+ }
+}
+
+TEST_F(RaftTest, FiveCrashLeaderAndAnotherAndRecover) {
+ LogEntry config_log_entry(ConfigLogEntry(5));
+ StartUp(5, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+ EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+ CrashAndRecover(ileader, config_log_entry);
+ CrashAndRecover((ileader + 1) % 5, config_log_entry);
+ Ticks(20);
+ // Verify that the log is replicated.
+ EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+ EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_.size(), 2);
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, FiveCrashAndBurnLeaderAndAnother) {
+ LogEntry config_log_entry(ConfigLogEntry(5));
+ StartUp(5, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ CrashAndBurn(ileader, config_log_entry);
+ CrashAndBurn((ileader + 1) % 5, config_log_entry);
+ Ticks(20);
+ // Verify that the log is replicated.
+ EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+ EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_.size(), 2);
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[1]->data(), "b");
+}
+
+// Test that a log from a leader without quorum never is committed and that a
+// log with the same index from a leader with quorum is.
+TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
+ LogEntry config_log_entry(ConfigLogEntry(5));
+ StartUp(5, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ down_.insert(to_string((ileader + 1) % 5));
+ down_.insert(to_string((ileader + 2) % 5));
+ down_.insert(to_string((ileader + 3) % 5));
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ down_.clear();
+ down_.insert(to_string((ileader + 4) % 5));
+ down_.insert(to_string(ileader));
+ Ticks(20);
+ int ileader2 = servers_[((ileader + 1) % 5)]->leader_[0] - '0';
+ auto& raft2 = *servers_[ileader2]->raft_.get();
+ log_entry.set_data("c");
+ raft2.Propose(log_entry);
+ log_entry.set_data("d");
+ raft2.Propose(log_entry);
+ Ticks(20);
+ down_.clear();
+ Ticks(20);
+ Ticks(20);
+ for (auto i : {0, 1, 2, 3, 4}) {
+ EXPECT_EQ(servers_[i]->commits_.size(), 2);
+ EXPECT_EQ(servers_[i]->commits_[0]->data(), "c");
+ EXPECT_EQ(servers_[i]->commits_[1]->data(), "d");
+ }
+}
+
+TEST_F(RaftTest, ReplicaFailover) {
+ LogEntry config_log_entry;
+ config_log_entry.mutable_config()->add_node("0");
+ config_log_entry.mutable_config()->add_replica("1");
+ StartUp(2, config_log_entry);
+ Ticks(20);
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a");
+ raft.Propose(log_entry);
+ log_entry.set_data("b");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->commits_.size(), 2);
+ EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[1]->commits_.size(), 2);
+ EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[0]->leader_, "0");
+ EXPECT_EQ(servers_[1]->leader_, "0");
+ config_log_entry.mutable_config()->clear_node();
+ config_log_entry.mutable_config()->clear_replica();
+ config_log_entry.mutable_config()->add_node("1");
+ config_log_entry.mutable_config()->add_replica("0");
+ CrashAndBurn(0, config_log_entry);
+ CrashAndRecover(1, config_log_entry);
+ Ticks(20);
+ // Verify that the log is replicated.
+ EXPECT_EQ(servers_[0]->commits_.size(), 2);
+ EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[1]->commits_.size(), 2);
+ EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+ EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+ EXPECT_EQ(servers_[0]->leader_, "1");
+ EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneSnapshotTwo) {
+ LogEntry config_log_entry;
+ StartUp(1, config_log_entry);
+ auto& raft = *servers_[0]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a=1");
+ raft.Propose(log_entry);
+ log_entry.set_data("b=2");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+ EXPECT_EQ(servers_[0]->state_["b"].second, "2");
+ log_entry.set_data("b=3");
+ raft.Propose(log_entry);
+ Ticks(20);
+ EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+ EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+ SnapshotCrashAndRecover(0, config_log_entry);
+ Ticks(20);
+ // Verify that the state is restored.
+ EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+ EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+}
+
+TEST_F(RaftTest, OneTwoThreeSnapshotOneTwoCrashAndBurnThree) {
+ LogEntry config_log_entry(ConfigLogEntry(3));
+ StartUp(3, config_log_entry);
+ Ticks(20);
+ int ileader = servers_[0]->leader_[0] - '0';
+ auto& raft = *servers_[ileader]->raft_.get();
+ LogEntry log_entry;
+ log_entry.set_data("a=1");
+ raft.Propose(log_entry);
+ log_entry.set_data("b=2");
+ raft.Propose(log_entry);
+ Ticks(20);
+ log_entry.set_data("b=3");
+ raft.Propose(log_entry);
+ Ticks(20);
+ SnapshotCrashAndRecover(0, config_log_entry);
+ SnapshotCrashAndRecover(1, config_log_entry);
+ CrashAndBurn(2, config_log_entry);
+ Ticks(20);
+ // Verify that the state is restored.
+ EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+ EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+ EXPECT_EQ(servers_[2]->state_["a"].second, "1");
+ EXPECT_EQ(servers_[2]->state_["b"].second, "3");
+}
+} // namespace raft
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/test_makefile
----------------------------------------------------------------------
diff --git a/lib/raft/test_makefile b/lib/raft/test_makefile
new file mode 100644
index 0000000..4520307
--- /dev/null
+++ b/lib/raft/test_makefile
@@ -0,0 +1,140 @@
+#OPTIMIZE=1
+DEBUG=1
+#PROFILE=1
+PREFIX=/usr/local
+OUT_DIR=.
+
+# Use default if no configuration specified.
+CXX ?= g++
+#CXX = clang
+AR ?= ar
+
+PROTOC = protoc
+PYTHONPATH := .:$(PATHONPATH)
+
+.PHONY: all depend install
+
+ifdef OPTIMIZE
+CXXFLAGS += -O3
+endif
+ifdef DEBUG
+CXXFLAGS += -ggdb
+endif
+ifdef PROFILE
+CXXFLAGS += -O3 -g
+endif
+
+OS_TYPE = $(shell uname -s | \
+ awk '{ split($$1,a,"_"); printf("%s", a[1]); }')
+OS_VERSION = $(shell uname -r | \
+ awk '{ split($$1,a,"."); sub("V","",a[1]); \
+ printf("%d%d%d",a[1],a[2],a[3]); }')
+ARCH = $(shell uname -m)
+ifeq ($(ARCH),i386)
+ ARCH = x86
+endif
+ifeq ($(ARCH),i486)
+ ARCH = x86
+endif
+ifeq ($(ARCH),i586)
+ ARCH = x86
+endif
+ifeq ($(ARCH),i686)
+ ARCH = x86
+endif
+
+ifeq ($(ARCH),x86)
+ifneq ($(OS_TYPE),Darwin)
+# Darwin lies
+$(error 64-bit required)
+endif
+endif
+ifeq ($(OS_TYPE),Darwin)
+ ARFLAGS = crvs
+else
+ ARFLAGS = crv
+endif
+
+ifneq ($(OS_TYPE),CYGWIN)
+ifneq ($(OS_TYPE),Darwin)
+ LIBS += -lrt -lpthread
+endif
+endif
+
+
+# Function for getting a set of source files.
+get_srcs = $(shell find $(1) -name \*.c -or -name \*.cpp -or -name \*.cc -a ! -name \*_test.cc | tr "\n" " ")
+
+PROTOS = raft
+PROTO_SOURCES = $(addsuffix .pb.cc,$(PROTOS))
+PROTO_INCLUDES = $(addsuffix .pb.h,$(PROTOS))
+PROTO_PYTHON = $(addsuffix _pb2.py,$(PROTOS))
+TEST_SOURCES = $(shell find . -name \*_test.cc)
+TESTS = $(basename $(TEST_SOURCES))
+SOURCES = $(PROTO_SOURCES) $(TEST_SOURCES)
+
+GTEST_DIR = /usr/include/gtest
+GTEST_LIB = /usr/lib
+
+CXXFLAGS += $(OPT)
+CXXFLAGS += -std=c++11
+CXXFLAGS += -I. -I./include -I./util -I./common -I$(GTEST_DIR) \
+ -I/usr/local/include -I/usr/include
+CXXFLAGS += `pkg-config --cflags protobuf`
+
+LIBS += -L/usr/local/lib/ -L/usr/lib -lgflags
+#LIBS += -L/usr/local/lib/ -L/usr/lib
+LIBS += `pkg-config --libs protobuf`
+
+source_to_object = $(addsuffix .o,$(basename $(1)))
+source_to_depend = $(addsuffix .d,$(basename $(1)))
+
+default: all
+
+all: $(TESTS)
+
+depend: $(call source_to_depend,$(SOURCES))
+ @echo Building $(call source_to_depend,$(SOURCES))
+
+test: $(TESTS)
+ for t in $^; \
+ do \
+ echo "***** Running $$t"; \
+ rm -rf $(TEST_TMPDIR); \
+ mkdir $(TEST_TMPDIR); \
+ ./$$t --test_tmpdir=$(TEST_TMPDIR) || exit 1; \
+ done; \
+ rm -rf $(TEST_TMPDIR)
+ @echo "All tests pass!"
+
+clean:
+ -rm -f `find . -type f -name '*.pb.*'`
+ -rm -f `find . -type f -name '*_pb2.py'`
+ -rm -f `find . -type f -name '*.o'`
+ -rm -f `find . -type f -name '*.pyc'`
+ -rm -f `find . -type f -name '*.d'`
+ -rm -f $(PROTO_INCLUDES) $(PROTO_SOURCES) $(PROTO_PYTHON)
+ -rm -f $(TESTS)
+
+%_test: raft.pb.o %_test.cc
+ $(CXX) -o $@ $*_test.cc $(CXXFLAGS) raft.pb.o $(LIBS) $(GTEST_LIB)/libgtest.a $(GTEST_LIB)/libgtest_main.a
+
+%.pb.cc %.pb.h %_pb2.py: %.proto
+ $(PROTOC) $^ --cpp_out=. --python_out=.
+
+%.o: %.cc $(MODULE_INCLUDES) $(CODEGEN_FILES)
+ $(CXX) -c $*.cc -o $(OUT_DIR)/$@ $(CXXFLAGS)
+
+%.d: %.cc $(MODULE_INCLUDES) $(CODEGEN_FILES)
+ $(CXX) -MM $*.cc $(CXXFLAGS) > $*.d
+
+%.cc: %.lex
+ flex -o $@ $^
+
+%.cc %.h: %.y
+ bison --output=$*.cc --defines=$*.h $^
+
+
+ifneq ($(MAKECMDGOALS),clean)
+ -include $(call source_to_depend,$(SOURCES))
+endif