You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2015/07/21 23:54:33 UTC

trafficserver git commit: TS-3786: Implementation of the RAFT consensus protocol.

Repository: trafficserver
Updated Branches:
  refs/heads/master 3184b5521 -> 38e1d741f


TS-3786: Implementation of the RAFT consensus protocol.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/38e1d741
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/38e1d741
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/38e1d741

Branch: refs/heads/master
Commit: 38e1d741f2056a7893a0a90452d37a29ae9bf707
Parents: 3184b55
Author: John Plevyak <jp...@google.com>
Authored: Tue Jul 21 10:16:48 2015 -0700
Committer: John Plevyak <jp...@google.com>
Committed: Tue Jul 21 14:53:26 2015 -0700

----------------------------------------------------------------------
 lib/raft/raft.h        | 129 ++++++++
 lib/raft/raft.proto    |  92 ++++++
 lib/raft/raft_impl.h   | 530 ++++++++++++++++++++++++++++++
 lib/raft/raft_test.cc  | 783 ++++++++++++++++++++++++++++++++++++++++++++
 lib/raft/test_makefile | 140 ++++++++
 5 files changed, 1674 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft.h b/lib/raft/raft.h
new file mode 100644
index 0000000..f926a11
--- /dev/null
+++ b/lib/raft/raft.h
@@ -0,0 +1,129 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RAFT_H_
+#define RAFT_H_
+// An implementation of the RAFT consensus algorithm:
+//   https://ramcloud.stanford.edu/raft.pdf
+//
+// Features:
+//   * Leader election
+//   * Log replication
+//   * Snapshotting
+//   * Configuration updates including changing the set of nodes participating.
+//   * Resistant to failures (i.e. complete/partial log/message loss).
+//
+// Servers need to implement the functionality in ExampleRaftServer.  A
+// single server may have many Raft objects.
+//
+// On startup:
+//   MyServer server;
+//   Raft<MyServer>* raft(NewRaft(&server));
+//   Initialize the log with the initial config if this is the first run:
+//     create an empty log entry, set the initial config and write it.
+//   for (log_entry : log)
+//     Recover(log_entry);
+//       expect CommitLogEntry calls.
+//   raft.Start(now, random_string_to_initialize_random_number_generator);
+//     expect ConfigChange() and LeaderChange()
+//
+// Main loop (executed by the user code till done):
+//   Call Tick(now) initially and periodically, e.g. every 25 msecs.
+//     now is monontically increasing time in seconds (double)
+//   On a message from a node, call Run(message)
+//      expect SendMessage(), GetLogEntry(), WriteLogEntry(),
+//      CommitLogEntry(), LeaderChange() and ConfigChange() calls.
+//      Note: WriteLogEntry() is blocking, so if you are using Stubby, shift
+//            to another (non-Stubby) thread before calling raft.Run()
+//   On periodic snapshot, compress the log and call GetSnapshot() to
+//     get entries with the raft meta data (and optionally uncommitted
+//     entries) which should appear at the end of any compressed log/snapshot.
+//     If CommitLogEntry() is idempotent the snapshot can be taken incrementally
+//     and a conservative log tail can be retained.
+// When done call Stop()
+//   expect SendMessage() etc. calls.
+// delete the Raft object.
+//
+// On master:
+//    Periodically call Propose() with a new log entry.  This may include data
+//    or config, neither or both.
+//
+// Changing the nodes participating:
+//   Configuration changes involving changes in the number of nodes require
+//   raft from a majority of both the new and old configurations.  Until
+//   the configuration change has been accepted, the old quorum can continue
+//   to commit log entries, resulting in the config_commmit falling behind
+//   the data_commit.  Once both quorums have accepted the new configuration,
+//   the next commit will require both quorums and will update both the
+//   data_commit and config_commit at which point ConfigChange() will be called
+//   and the new configuration will be live.
+//
+//   This class is thread-unsafe (wrap it with a lock) and not reentrant.
+#include <string>
+#include <vector>
+
+namespace raft {
+template <typename Server>
+class Raft {
+ public:
+  typedef typename Server::Message Message;
+  typedef typename Server::LogEntry LogEntry;
+
+  virtual ~Raft() {}
+
+  virtual void SetElectionTimeout(double seconds) = 0;  // 1 sec.
+
+  virtual void Recover(const LogEntry& entry) = 0;
+  virtual void Start(double now, int64_t random_seed) = 0;
+  virtual void Tick(double now) = 0;  // Call every ~election_timeout/10.
+  virtual void Propose(const LogEntry& entry) = 0;
+  virtual void Run(double now, const Message& message) = 0;
+  virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) = 0;
+  virtual void Stop() = 0;  // Clean shutdown for faster failover.
+};
+// The server argument is not owned by the Raft.
+template <typename Server>
+Raft<Server>* NewRaft(Server* server, const ::std::string &node);
+
+// The Server template argument of Raft must conform to this interface.
+class RaftServerInterface {
+ public:
+  typedef Raft<RaftServerInterface> RaftClass;
+  class Config;    // See RaftConfigPb in raft.proto.
+  class LogEntry;  // See RaftLogEntryPb in raft.proto.
+  class Message;   // See RaftMessagePb in raft.proto.
+
+  // Since a single server may handle multiple raft objects, the
+  // RaftClass argument is provided to differentiate the caller.
+
+  // Send a raft message to the given node.
+  // Returns: true if accepted for delivery.
+  bool SendMessage(RaftClass* raft, const ::std::string& node,
+                   const Message& message);
+  // Get a LogEntry to update a node from after (term, index) up to end.  These
+  // could be the actual written log entry or for committed log entries one
+  // which summarizes the changes.
+  void GetLogEntry(RaftClass* raft, int64_t term, int64_t index,
+                   int64_t end, LogEntry* entry);
+  // Write a log entry, returning when it has been persisted.
+  void WriteLogEntry(RaftClass* raft, const LogEntry& entry);
+  // Commit a log entry, updating the server state.
+  void CommitLogEntry(RaftClass* raft, const LogEntry& entry);
+  // The leader has changed.  If leader.empty() there is no leader.
+  void LeaderChange(RaftClass* raft, const ::std::string& leader);
+  // The configuration has changed.
+  void ConfigChange(RaftClass* raft, const Config& config);
+};
+}  // namespace raft
+#endif  // RAFT_H_

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft.proto
----------------------------------------------------------------------
diff --git a/lib/raft/raft.proto b/lib/raft/raft.proto
new file mode 100644
index 0000000..8b35043
--- /dev/null
+++ b/lib/raft/raft.proto
@@ -0,0 +1,92 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+
+package raft;
+
+// This file describes the requirements the consensus algorithm.
+
+// For Raft<Server>, Server::LogEntry must contain these fields.
+message RaftConfigPb {
+  repeated string node = 1;     // Nodes participating as the consensus.
+  repeated string replica = 2;  // Replicas not participating in consensus.
+  // Internal fields: set by the consensus algorithm.
+  optional int64 term = 3;
+  optional int64 index = 4;
+
+  extensions 100 to max;
+}
+
+// For Raft<Server>, Server::LogEntry must contain these fields.
+//
+// PUBLIC
+// The only fields which should be set by the user are:
+//   'data' which can be used to store any user data.  It is otherwise unused.
+//   'config' which can be set to propose a configuration change.
+//   'index' should be set in response to GetLogEntry() either because the
+//      entry has come from the log (where it was set by Raft) or for
+//      summaries, by the user.
+//   'extent' can be set for Propose() and summaries from GetLogEntry() to
+//      indicate that the log covers a range of indexes.
+//
+// PRIVATE
+//   There are 3 types of consensus log entries:
+//      User: when !has_term(), the entry has come from the user:
+//         * the initial configuration which was prepended to the log manually.
+//         * summaries in a compressed log or from GetLogEntry().
+//      Local: when !has_index(), the entry stores only internal state:
+//         * the leader, vote and data_committed, config_committed fields.
+//      Consensus: replicatable log entries which have been through consesus.
+//         * User log entries are converted to Consensus log entries on ingest.
+message RaftLogEntryPb {
+  // External fields: set by the user.
+  optional bytes data = 1;    // Available for user data.
+  optional RaftConfigPb config = 2;
+  optional int64 index = 3;   // Monotonic log entry index.
+  optional int64 extent = 4;  // Indexes covered are [index, index + extent].
+  // External/Internal fields: set by consensus, readable by user.
+  optional int64 term = 5;    // When !has_term() directly from user.
+  // Internal fields: set by the consensus algorithm, purely internal.
+  optional int64 previous_log_term = 6;
+  optional int64 previous_log_index = 7;
+  // Local fields: set by consensus, only persisted on the local node.
+  optional string leader = 8;
+  optional int64 data_committed = 9;     // Index of committed data.
+  optional int64 config_committed = 10;  // Index of committed config.
+  optional string vote = 11;             // Vote in the term.
+
+  extensions 100 to max;
+}
+
+// For Raft<Server>, Server::Message must contain these fields.
+// PRIVATE
+message RaftMessagePb {
+  optional int64 term = 1;
+  optional string from = 2;  // Node this message is from.
+  optional string leader = 3;
+  optional int64 data_committed = 4;
+  optional int64 config_committed = 5;
+  optional RaftLogEntryPb entry = 8;
+  // Acknowledgement.
+  optional bool nack = 9;  // Reset the stream to last_log_term/index.
+  // Voting and acknowledgement.
+  optional int64 last_log_term = 10;
+  optional int64 last_log_index = 11;
+  // Voting.
+  optional string vote = 12;
+
+  extensions 100 to max;
+}

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft_impl.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft_impl.h b/lib/raft/raft_impl.h
new file mode 100644
index 0000000..10248b3
--- /dev/null
+++ b/lib/raft/raft_impl.h
@@ -0,0 +1,530 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef CONSENSUS_IMPL_H_
+#define CONSENSUS_IMPL_H_
+#include <stdlib.h>
+#include <algorithm>
+#include <deque>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "raft.h"
+
+namespace raft {
+
+template <typename Server>
+class RaftImpl : public Raft<Server> {
+ public:
+  typedef typename Server::Message Message;
+  typedef typename Server::LogEntry LogEntry;
+  typedef typename Server::Config Config;
+
+  RaftImpl(Server* server, const ::std::string &node)
+      : server_(server), node_(node) {}
+  ~RaftImpl() {}
+
+  virtual void SetElectionTimeout(double timeout) {
+    election_timeout_ = timeout;
+  }
+
+  virtual void Recover(const LogEntry& e) {
+    if (!e.has_term()) {    // LogEntry from server.
+      if (e.has_index()) {  // Summary log entry.
+        ProcessLogEntry(e, true);
+        Commit(true);
+      } else if (e.has_config()) {
+        config_.CopyFrom(e.config());
+        ConfigChanged();
+      }
+    } else {  // LogEntry has passed through Raft.
+      if (e.term() > term_) NewTerm(e.term(), e.leader(), true);
+      if (e.has_config_committed()) config_committed_ = e.config_committed();
+      if (e.has_data_committed()) data_committed_ = e.data_committed();
+      ProcessLogEntry(e, true);
+      Commit(true);
+    }
+  }
+
+  virtual void Start(double now, int64_t seed) {
+    last_heartbeat_ = now;
+    srand48_r(seed, &rand_);
+    double r = 0.0;
+    drand48_r(&rand_, &r);
+    random_election_delay_ = election_timeout_ * r;
+    if (ConfigChanged())
+      NewTerm(term_ + 1, leader_, true);
+    else
+      vote_ = node_;  // Conservatively assume we called a vote for ourself.
+    server_->ConfigChange(this, config_);
+    server_->LeaderChange(this, leader_);
+  }
+
+  virtual void Tick(double now) {
+    if (i_am_in_nodes() && !other_nodes_.empty() &&
+        now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
+      double r = 0.0;
+      drand48_r(&rand_, &r);
+      random_election_delay_ = election_timeout_ * r;
+      last_heartbeat_ = now;
+      VoteForMe();
+      return;
+    }
+    // Send heartbeats at 1/4 of timeout to allow for lost packets/connections.
+    if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
+      last_heartbeat_sent_ = now;
+      ReplicateAll(true);
+    }
+  }
+
+  virtual void Propose(const LogEntry& e) {
+    assert(i_am_leader());
+    LogEntry entry(e);
+    entry.set_term(term_);
+    entry.set_index(index_ + 1);
+    entry.set_previous_log_term(last_log_term_);
+    entry.set_previous_log_index(index_);
+    ProcessLogEntry(entry, false);
+    ReplicateAll(false);
+    Commit(false);
+  }
+
+  virtual void Run(double now, const Message& m) {
+    if (m.term() >= term_) seen_term_ = true;
+    if (m.term() < term_) return;  // Ignore messages from terms gone by.
+    if (m.term() > term_) NewTerm(m.term(), m.leader(), false);
+    if (m.leader() != "" && leader_ != m.leader() &&
+        other_nodes_.count(m.from())) {  // Only from nodes I acknowledge.
+      leader_ = m.leader();
+      server_->LeaderChange(this, leader_);
+    }
+    auto& n = node_state_[m.from()];
+    if (n.term != m.term()) {
+      n.term = m.term();
+      n.vote = "";
+    }
+    n.term = term_;
+    n.last_log_term = m.last_log_term();
+    n.last_log_index = m.last_log_index();
+    if (m.from() != leader_ || m.has_vote()) {
+      HandleAck(now, m, &n);
+      if (m.has_vote()) HandleVote(m, &n);
+      return;
+    }
+    last_heartbeat_ = now;
+    if (m.config_committed() > config_committed_ ||
+        m.data_committed() > data_committed_) {
+      config_committed_ = m.config_committed();
+      data_committed_ = m.data_committed();
+      WriteInternalLogEntry();
+    }
+    if (m.has_entry())
+      Ack(ProcessLogEntry(m.entry(), false));
+    else
+      Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
+    Commit(false);
+  }
+
+  virtual void Snapshot(bool uncommitted, ::std::vector<LogEntry>* entries) {
+    entries->clear();
+    LogEntry config_e;
+    config_e.set_term(config_.term());
+    config_e.set_index(config_.index());
+    config_e.set_vote(vote_);
+    config_e.set_data_committed(data_committed_);
+    config_e.set_config_committed(config_committed_);
+    config_e.mutable_config()->CopyFrom(config_);
+    entries->push_back(config_e);
+    if (pending_config_.has_term() &&
+        (!waiting_commits_.size() ||  // If it isn't in the waiting_commits.
+         waiting_commits_.front()->index() > pending_config_.index())) {
+      LogEntry pending_e;
+      pending_e.set_term(pending_config_.term());
+      pending_e.set_index(pending_config_.index());
+      pending_e.mutable_config()->CopyFrom(pending_config_);
+      entries->push_back(pending_e);
+    }
+    if (uncommitted)
+      for (auto& e : waiting_commits_) entries->push_back(*e);
+  }
+
+  virtual void Stop() { Abdicate(); }
+
+ private:
+  struct NodeState {
+    int64_t term = -1;
+    int64_t sent_term = 0;
+    int64_t sent_index = 0;
+    int64_t last_log_term = -1;
+    int64_t last_log_index = -1;
+    double ack_received = -1.0e10;
+    ::std::string vote;
+  };
+
+  Message InitializeMessage() {
+    Message m;
+    m.set_term(term_);
+    m.set_last_log_term(last_log_term_);
+    m.set_last_log_index(index_);
+    m.set_from(node_);
+    m.set_leader(leader_);
+    m.set_data_committed(data_committed_);
+    m.set_config_committed(config_committed_);
+    return m;
+  }
+
+  void NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery) {
+    vote_ = "";
+    term_ = term;
+    leader_ = new_leader;
+    waiting_commits_.clear();
+    if (!in_recovery) {
+      WriteInternalLogEntry();
+      server_->LeaderChange(this, leader_);
+    }
+  }
+
+  void VoteForMe() {
+    if (seen_term_ || leader_ != "" || vote_ != node_) {
+      vote_ = node_;
+      term_++;
+      leader_ = "";
+      waiting_commits_.clear();
+      WriteInternalLogEntry();
+      server_->LeaderChange(this, leader_);
+      seen_term_ = false;
+    }
+    Vote();
+  }
+
+  void Vote() {
+    Message m(InitializeMessage());
+    m.set_vote(vote_);
+    if (vote_ == node_)
+      SendToReplicas(m);
+    else
+      server_->SendMessage(this, vote_, m);
+  }
+
+  void HandleVote(const Message& m, NodeState* n) {
+    n->vote = m.vote();
+    if (vote_.empty()) {        // I have not voted yet.
+      if (m.vote() == node_) {  // Abdication.
+        VoteForMe();
+      } else if (m.last_log_term() >= last_log_term_ &&
+                 m.last_log_index() >= index_) {
+        // Vote for candidate if it is at least as up to date as we are.
+        vote_ = m.vote();
+        WriteInternalLogEntry();
+        Vote();
+      }
+    } else if (vote_ == node_ && node_ == n->vote) {
+      int votes = 0;
+      for (auto& o : other_config_nodes_) {
+        auto& s = node_state_[o];
+        if (s.term == term_ && s.vote == node_) votes++;
+      }
+      if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
+        leader_ = node_;
+        WriteInternalLogEntry();
+        server_->LeaderChange(this, leader_);
+        HeartBeat();  // Inform the others.
+      }
+    }
+  }
+
+  void Ack(bool ack) {
+    Message m(InitializeMessage());
+    if (!ack) {  // Reset local log state to last committed.
+      m.set_nack(true);
+      m.set_last_log_term(last_log_committed_term_);
+      m.set_last_log_index(last_log_committed_index_);
+      index_ = last_log_committed_index_;
+      last_log_term_ = last_log_committed_term_;
+    }
+    server_->SendMessage(this, leader_, m);
+  }
+
+  void HandleAck(double now, const Message& m, NodeState* n) {
+    n->ack_received = now;
+    if (m.nack()) {
+      n->sent_index = n->last_log_index;
+      n->sent_term = n->last_log_term;
+    } else if (i_am_leader()) {
+      int acks_needed = (other_nodes_.size() + 1) / 2;
+      for (auto& o : other_nodes_)
+        if (node_state_[o].ack_received >= last_heartbeat_sent_) acks_needed--;
+      if (acks_needed <= 0) last_heartbeat_ = now;
+      UpdateCommitted();
+    }
+  }
+
+  void HeartBeat() {
+    Message m(InitializeMessage());
+    SendToReplicas(m);
+  }
+
+  void SendToReplicas(const Message& m) {
+    for (auto& n : replicas_) server_->SendMessage(this, n, m);
+  }
+
+  void Abdicate() {
+    if (!i_am_leader()) return;
+    // Attempt to pass leadership to a worthy successor.
+    const ::std::string* best_node = nullptr;
+    NodeState* best = nullptr;
+    for (auto& n : other_nodes_) {
+      auto& s = node_state_[n];
+      if (!best || (s.last_log_term > best->last_log_term ||
+                    (s.last_log_term == best->last_log_term &&
+                     s.last_log_index > best->last_log_index))) {
+        best_node = &n;
+        best = &s;
+      }
+    }
+    if (best_node) {
+      term_++;
+      leader_ = "";
+      vote_ = *best_node;
+      WriteInternalLogEntry();
+      Message m(InitializeMessage());
+      m.set_vote(vote_);
+      server_->SendMessage(this, vote_, m);
+    }
+  }
+
+  void WriteInternalLogEntry() {
+    LogEntry e;
+    e.set_term(term_);
+    e.set_leader(leader_);
+    e.set_vote(vote_);
+    e.set_data_committed(data_committed_);
+    e.set_config_committed(config_committed_);
+    server_->WriteLogEntry(this, e);
+  }
+
+  bool ProcessLogEntry(const LogEntry& e, bool in_recovery) {
+    if (e.has_config()) {
+      pending_config_.CopyFrom(e.config());
+      pending_config_.set_term(e.term());
+      pending_config_.set_index(e.index());
+      ConfigChanged();
+    }
+    if (e.has_index()) {  // Not an internal entry.
+      std::unique_ptr<LogEntry> entry(new LogEntry(e));
+      if (e.index() <= index_) return true;  // Already seen this.
+      if (!entry->has_term()) {  // Summary, fill in the internal bits.
+        entry->set_term(term_);
+        index_ = entry->index() - 1;  // Summary need not have an extent().
+        entry->set_previous_log_term(last_log_term_);
+        entry->set_previous_log_index(index_);
+      }
+      if (e.term() < last_log_term_) return true;  // Already seen this.
+      if (e.term() == last_log_term_ && e.index() <= index_) return true;
+      if ((entry->previous_log_term() != last_log_term_ ||
+           entry->previous_log_index() != index_))
+        return false;  // Out of sequence.
+      if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
+        return false;  // Out of sequence.
+      last_log_term_ = entry->term();
+      index_ = entry->index() + entry->extent();
+      if (!in_recovery && i_am_leader()) {
+        if (!other_nodes_.size()) data_committed_ = index_;
+        if (!other_config_nodes_.size()) config_committed_ = index_;
+      }
+      entry->set_data_committed(data_committed_);
+      entry->set_config_committed(config_committed_);
+      if (!in_recovery) server_->WriteLogEntry(this, *entry);
+      waiting_commits_.emplace_back(entry.release());
+    }
+    return true;
+  }
+
+  int MajorityIndex(const ::std::set<::std::string>& other) {
+    ::std::vector<int64_t> indices(1, index_);
+    for (auto& o : other) indices.push_back(node_state_[o].last_log_index);
+    sort(indices.begin(), indices.end());
+    return indices[indices.size() / 2];
+  }
+
+  void UpdateCommitted() {
+    int i = MajorityIndex(other_nodes_);
+    if (i > data_committed_) {
+      data_committed_ = i;
+      WriteInternalLogEntry();
+      Commit(false);
+      HeartBeat();
+    }
+    if (pending_config_.has_term()) {  // If a pending configuration change.
+      int ci = MajorityIndex(other_config_nodes_);
+      // config_committed must be <= data_committed, so the new
+      // configuration must also concur with the new data_committed.
+      if (i == ci && ci > config_committed_) {
+        config_committed_ = ci;
+        WriteInternalLogEntry();
+        Commit(false);
+        HeartBeat();
+        if (!i_am_leader() && other_nodes_.size() > 1) Abdicate();
+      }
+    }
+  }
+
+  void Commit(bool in_recovery) {
+    ::std::vector<std::unique_ptr<LogEntry>> pending;
+    while (!waiting_commits_.empty() &&
+           waiting_commits_.front()->index() <= data_committed_) {
+      auto& e = waiting_commits_.front();
+      while (!pending.empty() && e->index() <= pending.back()->index())
+        pending.pop_back();
+      pending.emplace_back(e.release());
+      waiting_commits_.pop_front();
+    }
+    for (auto& e : pending) {
+      server_->CommitLogEntry(this, *e);
+      last_log_committed_term_ = e->term();
+      last_log_committed_index_ = e->index();
+    }
+    CommitConfig(in_recovery);
+  }
+
+  void CommitConfig(bool in_recovery) {
+    if (pending_config_.has_term() && pending_config_.term() == term_ &&
+        pending_config_.index() <= config_committed_) {
+      config_.Swap(&pending_config_);
+      pending_config_.Clear();
+      server_->ConfigChange(this, config_);
+      if (ConfigChanged()) {
+        NewTerm(term_ + 1, leader_, in_recovery);
+        if (!in_recovery) HeartBeat();
+      }
+    }
+  }
+
+  bool ConfigChanged() {  // Returns: true if the leader_ changed.
+    other_nodes_.clear();
+    other_config_nodes_.clear();
+    replicas_.clear();
+    for (auto& n : config_.node())
+      if (n != node_) {
+        other_nodes_.insert(n);
+        other_config_nodes_.insert(n);
+      }
+    for (auto& n : pending_config_.node())
+      if (n != node_) other_config_nodes_.insert(n);
+    replicas_.insert(config_.replica().begin(), config_.replica().end());
+    replicas_.insert(pending_config_.replica().begin(),
+                     pending_config_.replica().end());
+    replicas_.insert(other_nodes_.begin(), other_nodes_.end());
+    replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
+    ::std::string old_leader = leader_;
+    if (!other_nodes_.size())
+      leader_ = node_;
+    else if (!i_am_in_nodes() && other_nodes_.size() == 1)
+      leader_ = *other_nodes_.begin();
+    else if (leader_ == node_ && !i_am_in_nodes())
+      leader_ = "";
+    return leader_ != old_leader;
+  }
+
+  bool SendReplicationMessage(const ::std::string& n, const LogEntry& entry,
+                              NodeState* s) {
+    Message m(InitializeMessage());
+    m.mutable_entry()->CopyFrom(entry);
+    if (!server_->SendMessage(this, n, m)) return false;
+    s->sent_index = entry.index() + entry.extent();
+    s->sent_term = entry.term();
+    return true;
+  }
+
+  void Replicate(const ::std::string& n, bool heartbeat) {
+    bool sent = false;
+    auto& s = node_state_[n];
+    if (s.term == term_) {  // Replica has acknowledged me as leader.
+      int64_t end = index_;
+      if (waiting_commits_.size()) end = waiting_commits_.front()->index() - 1;
+      while (s.sent_index < end) {  // Get from server.
+        LogEntry entry;
+        server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
+        if (!entry.has_term()) {
+          // A summary log entry from the server with historical information.
+          entry.set_term(last_log_term_);
+          entry.set_index(s.sent_index + 1);
+        }
+        entry.set_previous_log_term(s.sent_term);
+        entry.set_previous_log_index(s.sent_index);
+        assert(entry.index() > s.sent_index);
+        int64_t x = s.sent_index;
+        if (!SendReplicationMessage(n, entry, &s)) break;
+        assert(s.sent_index > x);
+        sent = true;
+      }
+      for (auto& e : waiting_commits_) {
+        if (e->index() <= s.sent_index)  // Skip those already sent.
+          continue;
+        if (!SendReplicationMessage(n, *e, &s)) break;
+        sent = true;
+      }
+    }
+    if (heartbeat && !sent) {
+      Message m(InitializeMessage());
+      server_->SendMessage(this, n, m);
+    }
+  }
+
+  void ReplicateAll(bool heartbeat) {
+    for (auto& n : replicas_) Replicate(n, heartbeat);
+  }
+
+  bool i_am_leader() { return node_ == leader_; }
+  bool i_am_in_nodes() {
+    auto& n = config_.node();
+    return std::find(n.begin(), n.end(), node_) != n.end();
+  }
+
+  Server* const server_;
+  struct drand48_data rand_;
+  const ::std::string node_;
+  int64_t term_ = 0;            // Current term.
+  int64_t last_log_term_ = -1;  // Term of last log entry this node has.
+  int64_t index_ = 0;           // Index of last log entry this node has.
+  int64_t config_committed_ = -1;
+  int64_t data_committed_ = -1;
+  int64_t last_log_committed_index_ = -1;
+  int64_t last_log_committed_term_ = -1;
+  double election_timeout_ = 1.0;
+  double last_heartbeat_ = -1.0e10;
+  double last_heartbeat_sent_ = -1.0e10;
+  double random_election_delay_ = 0.0;
+  ::std::string leader_;  // The current leader.  "" if there is no leader.
+  ::std::string vote_;    // My vote this term.
+  Config config_;
+  Config pending_config_;
+  ::std::map<::std::string, NodeState> node_state_;
+  ::std::deque<std::unique_ptr<LogEntry>> waiting_commits_;
+  bool seen_term_ = true;
+  // Cached values.
+  ::std::set<::std::string> other_nodes_;  // Nodes required for consensus on log entries.
+  ::std::set<::std::string> other_config_nodes_;  // Nodes required for config changes.
+  ::std::set<::std::string> replicas_;  // All nodes receiving the replication stream.
+};
+
+template <typename Server>
+Raft<Server>* NewRaft(Server* server, const ::std::string &node) {
+  return new RaftImpl<Server>(server, node);
+}
+}  // namespace raft
+#endif  // CONSENSUS_IMPL_H_

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/raft_test.cc
----------------------------------------------------------------------
diff --git a/lib/raft/raft_test.cc b/lib/raft/raft_test.cc
new file mode 100644
index 0000000..b52ef11
--- /dev/null
+++ b/lib/raft/raft_test.cc
@@ -0,0 +1,783 @@
+// Copyright 2014 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "raft.h"
+
+#include <algorithm>
+#include <deque>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "raft.pb.h"
+#include "raft_impl.h"
+#include "gtest/gtest.h"
+
+using ::std::deque;
+using ::std::map;
+using ::std::set;
+using ::std::string;
+using ::std::unique_ptr;
+using ::std::vector;
+using ::std::pair;
+using ::std::to_string;
+
+const int kMaxServers = 10;
+
+namespace raft {
+
+class RaftTest;
+
+class RaftServer : public RaftServerInterface {
+ public:
+  typedef RaftMessagePb Message;
+  typedef RaftLogEntryPb LogEntry;
+  typedef RaftConfigPb Config;
+  typedef Raft<RaftServer> RaftClass;
+
+  RaftServer(const string node, RaftTest* test) {
+    node_ = node;
+    test_ = test;
+    raft_.reset(NewRaft(this, node));
+  }
+
+  bool SendMessage(RaftClass* raft, const string& node,
+                   const Message& message);
+
+  void GetLogEntry(RaftClass* raft, int64_t term, int64_t start,
+                   int64_t end, LogEntry* entry) {
+    if (use_commit_log_) {
+      for (auto& e : commits_) {
+        if (e->term() < term) continue;
+        if (e->index() >= start) {
+          entry->CopyFrom(*e);
+          return;
+        }
+      }
+    } else {
+      for (auto& e : log_) {
+        if (e->term() < term) continue;
+        if (e->has_index() && e->index() >= start) {
+          entry->CopyFrom(*e);
+          return;
+        }
+      }
+    }
+    entry->Clear();
+  }
+
+  void WriteLogEntry(RaftClass* raft, const LogEntry& entry) {
+    log_.emplace_back(new LogEntry(entry));
+  }
+
+  void CommitLogEntry(RaftClass* raft, const LogEntry& entry) {
+    commits_.emplace_back(new LogEntry(entry));
+    string s = entry.data();
+    auto p = s.find("=");
+    if (p != string::npos)
+      state_[s.substr(0, p)] = make_pair(entry.index(), s.substr(p + 1));
+  }
+
+  void LeaderChange(RaftClass* raft, const string& leader) {
+    leader_ = leader;
+  }
+
+  void ConfigChange(RaftClass* raft, const Config& config) {
+    config_.reset(new Config(config));
+  }
+
+  bool use_commit_log_ = false;
+  RaftTest* test_;
+  unique_ptr<Config> config_;
+  string node_;
+  string leader_;
+  unique_ptr<RaftClass> raft_;
+  vector<unique_ptr<LogEntry>> log_;
+  vector<unique_ptr<LogEntry>> commits_;
+  map<string, pair<int64_t, string>> state_;
+};
+
+template <typename T>
+bool firstless(const T& a, const T& b) {
+  return a.first < b.first;
+}
+
+class RaftTest : public ::testing::Test {
+ public:
+  typedef RaftServer::Message Message;
+  typedef RaftServer::LogEntry LogEntry;
+  typedef RaftServer::Config Config;
+
+  void SendMessage(const string& from, const string& to,
+                   const Message& message) {
+    if (down_.count(from) || down_.count(to)) return;
+    messages_.emplace_back(make_pair(to, new Message(message)));
+  }
+
+  void ForwardMessages() {
+    while (!messages_.empty()) {
+      auto& p = messages_.front();
+      for (auto& s : servers_)
+        if (p.first == s->node_) s->raft_->Run(now_, *p.second);
+      delete p.second;
+      messages_.pop_front();
+    }
+  }
+
+ protected:
+  RaftTest() : now_(0) {}
+  virtual ~RaftTest() {
+    for (auto& p : messages_) delete p.second;
+  }
+
+  // 20 ticks gives 2 full election timeouts because the timeouts are random
+  // on any given node and very from [1, 2) timeouts.
+  void Ticks(int n) {
+    for (int i = 0; i < n; i++) {
+      now_ += 0.1;
+      for (auto& s : servers_) {
+        s->raft_->Tick(now_);
+        ForwardMessages();
+      }
+    }
+  }
+
+  void StartUp(int n, const LogEntry& config_log_entry) {
+    int offset = servers_.size();
+    for (int i = offset; i < n + offset; i++) {
+      servers_.emplace_back(new RaftServer(to_string(i), this));
+      auto& raft = *servers_[i]->raft_.get();
+      raft.Recover(config_log_entry);
+      raft.Start(0, i);
+    }
+  }
+
+  void CrashAndRecover(int i, const LogEntry& config_log_entry) {
+    vector<unique_ptr<LogEntry>> log;
+    for (auto& p : servers_[i]->log_) log.emplace_back(p.release());
+    servers_[i].reset(new RaftServer(to_string(i), this));
+    auto& raft = *servers_[i]->raft_.get();
+    raft.Recover(config_log_entry);
+    for (auto& p : log) {
+      raft.Recover(*p);
+      servers_[i]->log_.emplace_back(p.release());
+    }
+    raft.Start(now_, i);
+  }
+
+  void CrashAndBurn(int i, const LogEntry& config_log_entry) {
+    servers_[i].reset(new RaftServer(to_string(i), this));
+    auto& raft = *servers_[i]->raft_.get();
+    raft.Recover(config_log_entry);
+    raft.Start(now_, i);
+  }
+
+  void SnapshotCrashAndRecover(int i, const LogEntry& config_log_entry) {
+    vector<LogEntry> entries;
+    vector<pair<int64_t, string>> state;
+    for (auto& p : servers_[i]->state_)
+      state.push_back(
+          make_pair(p.second.first, p.first + "=" + p.second.second));
+    std::sort(state.begin(), state.end(), firstless<pair<int64_t, string>>);
+    servers_[i]->log_.clear();
+    for (auto& s : state) {
+      LogEntry* e = new LogEntry;
+      e->set_index(s.first);
+      e->set_data(s.second);
+      servers_[i]->log_.emplace_back(e);
+    }
+    auto& raft = *servers_[i]->raft_.get();
+    raft.Snapshot(false, &entries);
+    for (auto& e : entries) servers_[i]->log_.emplace_back(new LogEntry(e));
+    servers_[i]->state_.clear();
+    CrashAndRecover(i, config_log_entry);
+  }
+
+  LogEntry ConfigLogEntry(int n) {
+    LogEntry config_log_entry;
+    for (int i = 0; i < n; i++)
+      config_log_entry.mutable_config()->add_node(to_string(i));
+    return config_log_entry;
+  }
+
+  double now_;
+  set<string> down_;
+  vector<unique_ptr<RaftServer>> servers_;
+  deque<pair<string, Message*>> messages_;
+};
+
+bool RaftServer::SendMessage(RaftClass* raft, const string& node,
+                                  const Message& message) {
+  test_->SendMessage(node_, node, message);
+  return true;
+}
+
+TEST_F(RaftTest, OneEmptyConfig) {
+  servers_.emplace_back(new RaftServer("0", this));
+  auto& raft = *servers_[0]->raft_.get();
+  raft.Start(0, 0);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->leader_, "0");
+}
+
+TEST_F(RaftTest, One) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->leader_, "0");
+}
+
+TEST_F(RaftTest, OneTwoNotParticipating) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  // Startup server 0 as leader.
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  // Startup server 1 with config with 0 as leader.
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->leader_, "0");
+  EXPECT_EQ(servers_[1]->leader_, "0");
+}
+
+TEST_F(RaftTest, OneTwo) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  // Startup server 0 as leader.
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  // Startup server 1 with config with 0 as leader.
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  // Add 1 into consensus.
+  {
+    auto& raft = *servers_[0]->raft_.get();
+    LogEntry config_log_entry;
+    config_log_entry.mutable_config()->add_node("0");
+    config_log_entry.mutable_config()->add_node("1");
+    raft.Propose(config_log_entry);
+    Ticks(20);
+  }
+  EXPECT_EQ(servers_[0]->leader_, "0");
+  EXPECT_EQ(servers_[1]->leader_, "0");
+  EXPECT_EQ(servers_[0]->commits_.size(), 1);
+  EXPECT_EQ(servers_[1]->commits_.size(), 1);
+}
+
+TEST_F(RaftTest, OneTwoSwitchToTwo) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  // Startup servers 0, and 1 with 0 as leader.
+  StartUp(1, config_log_entry);
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  // Add 1 into consensus.
+  {
+    auto& raft = *servers_[0]->raft_.get();
+    LogEntry config_log_entry(ConfigLogEntry(2));
+    raft.Propose(config_log_entry);
+    Ticks(20);
+  }
+  EXPECT_EQ(servers_[0]->leader_, "0");
+  EXPECT_EQ(servers_[1]->leader_, "0");
+  // Switch to only having 1.
+  {
+    auto& raft = *servers_[0]->raft_.get();
+    LogEntry config_log_entry;
+    config_log_entry.mutable_config()->add_node("1");
+    config_log_entry.mutable_config()->add_replica("0");
+    raft.Propose(config_log_entry);
+    Ticks(20);
+  }
+  EXPECT_EQ(servers_[0]->leader_, "1");
+  EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneThenTwo) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  // Startup servers 0 and 1, with 0 as leader.
+  StartUp(1, config_log_entry);
+  StartUp(1, config_log_entry);
+  Ticks(20);
+  // Switch to only having 1.
+  {
+    auto& raft = *servers_[0]->raft_.get();
+    LogEntry config_log_entry;
+    config_log_entry.mutable_config()->add_node("1");
+    raft.Propose(config_log_entry);
+    Ticks(20);
+  }
+  EXPECT_EQ(servers_[0]->leader_, "1");
+  EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneAndTwo) {
+  LogEntry config_log_entry(ConfigLogEntry(2));
+  // Startup servers 0, and 1 in nodes.
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  EXPECT_NE(servers_[0]->leader_, "");
+  EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoAndThree) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1 and 2 in nodes.
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  EXPECT_NE(servers_[0]->leader_, "");
+  EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+  EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoNotThree) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1 with config [0, 1, 2].
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  EXPECT_NE(servers_[0]->leader_, "");
+  EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneAndTwoThenTwoAndThree) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1 with config [0, 1, 2].
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  // Startup server 2 with config [0, 1, 2] and down 0.
+  StartUp(1, config_log_entry);
+  down_.insert("0");
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->leader_, "");
+  EXPECT_NE(servers_[1]->leader_, "");
+  EXPECT_NE(servers_[1]->leader_, "0");
+  EXPECT_EQ(servers_[2]->leader_, servers_[1]->leader_);
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAbdicate) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1, 2 with config [0, 1, 2].
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  raft.Stop();
+  down_.insert(to_string(ileader));
+  Ticks(1);  // Abdication will cause immediate reelection.
+  EXPECT_NE(servers_[(ileader + 1) % 3]->leader_, "");
+  EXPECT_EQ(servers_[(ileader + 1) % 3]->leader_,
+            servers_[(ileader + 2) % 3]->leader_);
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAllSeparate) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1, 2 with config [0, 1, 2].
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  down_.insert("0");
+  down_.insert("1");
+  down_.insert("2");
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->leader_, "");
+  EXPECT_EQ(servers_[1]->leader_, "");
+  EXPECT_EQ(servers_[2]->leader_, "");
+}
+
+TEST_F(RaftTest, OneTwoThreeThenAllSeparateThenTogether) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  // Startup servers 0, 1, 2 with config [0, 1, 2].
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  down_.insert("0");
+  down_.insert("1");
+  down_.insert("2");
+  Ticks(20);
+  down_.clear();
+  Ticks(20);
+  EXPECT_NE(servers_[0]->leader_, "");
+  EXPECT_EQ(servers_[1]->leader_, servers_[0]->leader_);
+  EXPECT_EQ(servers_[2]->leader_, servers_[0]->leader_);
+}
+
+TEST_F(RaftTest, OneLog) {
+  LogEntry config_log_entry;
+  StartUp(1, config_log_entry);
+  auto& raft = *servers_[0]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  EXPECT_EQ(servers_[0]->log_.size(), 1);
+  EXPECT_EQ(servers_[0]->log_[0]->data(), "a");
+  EXPECT_EQ(servers_[0]->commits_.size(), 1);
+  EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+}
+
+TEST_F(RaftTest, OneLogLog) {
+  LogEntry config_log_entry;
+  StartUp(1, config_log_entry);
+  auto& raft = *servers_[0]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  EXPECT_EQ(servers_[0]->log_.size(), 2);
+  EXPECT_EQ(servers_[0]->log_[0]->data(), "a");
+  EXPECT_EQ(servers_[0]->log_[1]->data(), "b");
+  EXPECT_EQ(servers_[0]->commits_.size(), 2);
+  EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, OneTwoLogLog) {
+  LogEntry config_log_entry(ConfigLogEntry(2));
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  int iother = ileader ? 0 : 1;
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  Ticks(20);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[ileader]->log_.size(), 7);
+  EXPECT_NE(servers_[ileader]->log_[0]->vote(), "");    // vote.
+  EXPECT_NE(servers_[ileader]->log_[1]->leader(), "");  // election.
+  EXPECT_EQ(servers_[ileader]->log_[2]->data_committed(),
+            servers_[ileader]->log_[1]->index());
+  EXPECT_EQ(servers_[ileader]->log_[3]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->log_[4]->data_committed(),
+            servers_[ileader]->log_[3]->index());
+  EXPECT_EQ(servers_[ileader]->log_[5]->data(), "b");
+  EXPECT_EQ(servers_[ileader]->log_[6]->data_committed(),
+            servers_[ileader]->log_[5]->index());
+  EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+  EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[iother]->commits_.size(), 2);
+  EXPECT_EQ(servers_[iother]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[iother]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, OneTwoThreeLogDownLogUp) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  Ticks(20);
+  int downer = (ileader + 1) % 3;
+  down_.insert(to_string(downer));
+  Ticks(20);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[downer]->commits_.size(), 1);
+  EXPECT_EQ(servers_[downer]->commits_[0]->data(), "a");
+  down_.clear();
+  Ticks(20);
+  Ticks(20);
+  for (auto i : {0, 1, 2}) {
+    EXPECT_EQ(servers_[i]->commits_.size(), 2);
+    EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+    EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+  }
+}
+
+TEST_F(RaftTest, OneTwoThreeLogLogThreeDamagedLogRestore) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  Ticks(20);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  int downer = (ileader + 1) % 3;
+  // Lose the "a" commit.
+  servers_[downer]->log_.erase(servers_[downer]->log_.begin() + 3);
+  CrashAndRecover(downer, config_log_entry);
+  Ticks(20);
+  for (auto i : {0, 1, 2}) {
+    EXPECT_EQ(servers_[i]->commits_.size(), 2);
+    EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+    EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+  }
+}
+
+TEST_F(RaftTest, OneTwoLogLogThenThree) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  config_log_entry.mutable_config()->add_node("1");
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  StartUp(1, config_log_entry);  // Start node 2.
+  config_log_entry.mutable_config()->add_node("2");
+  raft.Propose(config_log_entry);  // Change config to [0, 1, 2].
+  Ticks(20);
+  EXPECT_EQ(servers_[1]->commits_.size(), 3);
+  EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[1]->commits_[2]->config().node_size(), 3);
+  // Verify that the log is replicated.
+  EXPECT_EQ(servers_[2]->commits_.size(), 3);
+  EXPECT_EQ(servers_[2]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[2]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[2]->commits_[2]->config().node_size(), 3);
+}
+
+TEST_F(RaftTest, OneRecover) {
+  LogEntry config_log_entry;
+  StartUp(1, config_log_entry);
+  {
+    auto& raft = *servers_[0]->raft_.get();
+    LogEntry log_entry;
+    log_entry.set_data("a");
+    raft.Propose(log_entry);
+  }
+  Ticks(20);
+  CrashAndRecover(0, config_log_entry);
+  EXPECT_EQ(servers_[0]->commits_.size(), 1);
+  EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+}
+
+TEST_F(RaftTest, OneTwoThreeCrashAndBurnLeader) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  config_log_entry.mutable_config()->add_node("1");
+  config_log_entry.mutable_config()->add_node("2");
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+  EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+  CrashAndBurn(ileader, config_log_entry);
+  Ticks(20);
+  // Verify that the log is replicated.
+  for (auto i : {0, 1, 2}) {
+    EXPECT_EQ(servers_[i]->commits_.size(), 2);
+    EXPECT_EQ(servers_[i]->commits_[0]->data(), "a");
+    EXPECT_EQ(servers_[i]->commits_[1]->data(), "b");
+  }
+}
+
+TEST_F(RaftTest, FiveCrashLeaderAndAnotherAndRecover) {
+  LogEntry config_log_entry(ConfigLogEntry(5));
+  StartUp(5, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+  EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+  CrashAndRecover(ileader, config_log_entry);
+  CrashAndRecover((ileader + 1) % 5, config_log_entry);
+  Ticks(20);
+  // Verify that the log is replicated.
+  EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+  EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_.size(), 2);
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[1]->data(), "b");
+}
+
+TEST_F(RaftTest, FiveCrashAndBurnLeaderAndAnother) {
+  LogEntry config_log_entry(ConfigLogEntry(5));
+  StartUp(5, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  CrashAndBurn(ileader, config_log_entry);
+  CrashAndBurn((ileader + 1) % 5, config_log_entry);
+  Ticks(20);
+  // Verify that the log is replicated.
+  EXPECT_EQ(servers_[ileader]->commits_.size(), 2);
+  EXPECT_EQ(servers_[ileader]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[ileader]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_.size(), 2);
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[(ileader + 1) % 5]->commits_[1]->data(), "b");
+}
+
+// Test that a log from a leader without quorum never is committed and that a
+// log with the same index from a leader with quorum is.
+TEST_F(RaftTest, FiveLogDown3LogDown2Up3LogUp2) {
+  LogEntry config_log_entry(ConfigLogEntry(5));
+  StartUp(5, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  down_.insert(to_string((ileader + 1) % 5));
+  down_.insert(to_string((ileader + 2) % 5));
+  down_.insert(to_string((ileader + 3) % 5));
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  down_.clear();
+  down_.insert(to_string((ileader + 4) % 5));
+  down_.insert(to_string(ileader));
+  Ticks(20);
+  int ileader2 = servers_[((ileader + 1) % 5)]->leader_[0] - '0';
+  auto& raft2 = *servers_[ileader2]->raft_.get();
+  log_entry.set_data("c");
+  raft2.Propose(log_entry);
+  log_entry.set_data("d");
+  raft2.Propose(log_entry);
+  Ticks(20);
+  down_.clear();
+  Ticks(20);
+  Ticks(20);
+  for (auto i : {0, 1, 2, 3, 4}) {
+    EXPECT_EQ(servers_[i]->commits_.size(), 2);
+    EXPECT_EQ(servers_[i]->commits_[0]->data(), "c");
+    EXPECT_EQ(servers_[i]->commits_[1]->data(), "d");
+  }
+}
+
+TEST_F(RaftTest, ReplicaFailover) {
+  LogEntry config_log_entry;
+  config_log_entry.mutable_config()->add_node("0");
+  config_log_entry.mutable_config()->add_replica("1");
+  StartUp(2, config_log_entry);
+  Ticks(20);
+  auto& raft = *servers_[0]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a");
+  raft.Propose(log_entry);
+  log_entry.set_data("b");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->commits_.size(), 2);
+  EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[1]->commits_.size(), 2);
+  EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[0]->leader_, "0");
+  EXPECT_EQ(servers_[1]->leader_, "0");
+  config_log_entry.mutable_config()->clear_node();
+  config_log_entry.mutable_config()->clear_replica();
+  config_log_entry.mutable_config()->add_node("1");
+  config_log_entry.mutable_config()->add_replica("0");
+  CrashAndBurn(0, config_log_entry);
+  CrashAndRecover(1, config_log_entry);
+  Ticks(20);
+  // Verify that the log is replicated.
+  EXPECT_EQ(servers_[0]->commits_.size(), 2);
+  EXPECT_EQ(servers_[0]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[0]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[1]->commits_.size(), 2);
+  EXPECT_EQ(servers_[1]->commits_[0]->data(), "a");
+  EXPECT_EQ(servers_[1]->commits_[1]->data(), "b");
+  EXPECT_EQ(servers_[0]->leader_, "1");
+  EXPECT_EQ(servers_[1]->leader_, "1");
+}
+
+TEST_F(RaftTest, OneSnapshotTwo) {
+  LogEntry config_log_entry;
+  StartUp(1, config_log_entry);
+  auto& raft = *servers_[0]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a=1");
+  raft.Propose(log_entry);
+  log_entry.set_data("b=2");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+  EXPECT_EQ(servers_[0]->state_["b"].second, "2");
+  log_entry.set_data("b=3");
+  raft.Propose(log_entry);
+  Ticks(20);
+  EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+  EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+  SnapshotCrashAndRecover(0, config_log_entry);
+  Ticks(20);
+  // Verify that the state is restored.
+  EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+  EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+}
+
+TEST_F(RaftTest, OneTwoThreeSnapshotOneTwoCrashAndBurnThree) {
+  LogEntry config_log_entry(ConfigLogEntry(3));
+  StartUp(3, config_log_entry);
+  Ticks(20);
+  int ileader = servers_[0]->leader_[0] - '0';
+  auto& raft = *servers_[ileader]->raft_.get();
+  LogEntry log_entry;
+  log_entry.set_data("a=1");
+  raft.Propose(log_entry);
+  log_entry.set_data("b=2");
+  raft.Propose(log_entry);
+  Ticks(20);
+  log_entry.set_data("b=3");
+  raft.Propose(log_entry);
+  Ticks(20);
+  SnapshotCrashAndRecover(0, config_log_entry);
+  SnapshotCrashAndRecover(1, config_log_entry);
+  CrashAndBurn(2, config_log_entry);
+  Ticks(20);
+  // Verify that the state is restored.
+  EXPECT_EQ(servers_[0]->state_["a"].second, "1");
+  EXPECT_EQ(servers_[0]->state_["b"].second, "3");
+  EXPECT_EQ(servers_[2]->state_["a"].second, "1");
+  EXPECT_EQ(servers_[2]->state_["b"].second, "3");
+}
+}  // namespace raft

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/38e1d741/lib/raft/test_makefile
----------------------------------------------------------------------
diff --git a/lib/raft/test_makefile b/lib/raft/test_makefile
new file mode 100644
index 0000000..4520307
--- /dev/null
+++ b/lib/raft/test_makefile
@@ -0,0 +1,140 @@
+#OPTIMIZE=1
+DEBUG=1
+#PROFILE=1
+PREFIX=/usr/local
+OUT_DIR=.
+
+# Use default if no configuration specified.
+CXX ?= g++
+#CXX = clang
+AR ?= ar
+
+PROTOC = protoc
+PYTHONPATH := .:$(PATHONPATH)
+
+.PHONY: all depend install
+
+ifdef OPTIMIZE
+CXXFLAGS += -O3
+endif
+ifdef DEBUG
+CXXFLAGS += -ggdb
+endif
+ifdef PROFILE
+CXXFLAGS += -O3 -g
+endif
+
+OS_TYPE = $(shell uname -s | \
+	    awk '{ split($$1,a,"_"); printf("%s", a[1]);  }')
+OS_VERSION = $(shell uname -r | \
+	       awk '{ split($$1,a,"."); sub("V","",a[1]); \
+	         printf("%d%d%d",a[1],a[2],a[3]); }')
+ARCH = $(shell uname -m)
+ifeq ($(ARCH),i386)
+  ARCH = x86
+endif
+ifeq ($(ARCH),i486)
+  ARCH = x86
+endif
+ifeq ($(ARCH),i586)
+  ARCH = x86
+endif
+ifeq ($(ARCH),i686)
+  ARCH = x86
+endif
+
+ifeq ($(ARCH),x86)
+ifneq ($(OS_TYPE),Darwin)
+# Darwin lies
+$(error 64-bit required)
+endif
+endif
+ifeq ($(OS_TYPE),Darwin)
+  ARFLAGS = crvs
+else
+  ARFLAGS = crv
+endif
+
+ifneq ($(OS_TYPE),CYGWIN)
+ifneq ($(OS_TYPE),Darwin)
+  LIBS += -lrt -lpthread
+endif
+endif
+
+
+# Function for getting a set of source files.
+get_srcs = $(shell find $(1) -name \*.c -or -name \*.cpp -or -name \*.cc -a ! -name \*_test.cc | tr "\n" " ")
+
+PROTOS = raft
+PROTO_SOURCES = $(addsuffix .pb.cc,$(PROTOS))
+PROTO_INCLUDES = $(addsuffix .pb.h,$(PROTOS))
+PROTO_PYTHON = $(addsuffix _pb2.py,$(PROTOS))
+TEST_SOURCES = $(shell find . -name \*_test.cc)
+TESTS = $(basename $(TEST_SOURCES))
+SOURCES = $(PROTO_SOURCES) $(TEST_SOURCES)
+
+GTEST_DIR = /usr/include/gtest
+GTEST_LIB = /usr/lib
+
+CXXFLAGS += $(OPT)
+CXXFLAGS += -std=c++11
+CXXFLAGS += -I. -I./include -I./util -I./common -I$(GTEST_DIR) \
+	    -I/usr/local/include -I/usr/include
+CXXFLAGS += `pkg-config --cflags protobuf`
+
+LIBS += -L/usr/local/lib/ -L/usr/lib -lgflags
+#LIBS += -L/usr/local/lib/ -L/usr/lib
+LIBS += `pkg-config --libs protobuf`
+
+source_to_object = $(addsuffix .o,$(basename $(1)))
+source_to_depend = $(addsuffix .d,$(basename $(1)))
+
+default: all
+
+all: $(TESTS)
+
+depend: $(call source_to_depend,$(SOURCES))
+	@echo Building $(call source_to_depend,$(SOURCES))
+
+test: $(TESTS)
+	for t in $^; \
+		do \
+			echo "***** Running $$t"; \
+			rm -rf $(TEST_TMPDIR); \
+			mkdir $(TEST_TMPDIR); \
+			./$$t --test_tmpdir=$(TEST_TMPDIR) || exit 1; \
+		done; \
+	rm -rf $(TEST_TMPDIR)
+	@echo "All tests pass!"
+
+clean:
+	-rm -f `find . -type f -name '*.pb.*'`
+	-rm -f `find . -type f -name '*_pb2.py'`
+	-rm -f `find . -type f -name '*.o'`
+	-rm -f `find . -type f -name '*.pyc'`
+	-rm -f `find . -type f -name '*.d'`
+	-rm -f $(PROTO_INCLUDES) $(PROTO_SOURCES) $(PROTO_PYTHON)
+	-rm -f $(TESTS)
+
+%_test: raft.pb.o %_test.cc
+	$(CXX) -o $@ $*_test.cc $(CXXFLAGS) raft.pb.o $(LIBS) $(GTEST_LIB)/libgtest.a $(GTEST_LIB)/libgtest_main.a
+
+%.pb.cc %.pb.h %_pb2.py: %.proto
+	$(PROTOC) $^ --cpp_out=. --python_out=.
+
+%.o: %.cc $(MODULE_INCLUDES) $(CODEGEN_FILES)
+	$(CXX) -c $*.cc -o $(OUT_DIR)/$@ $(CXXFLAGS)
+
+%.d: %.cc $(MODULE_INCLUDES) $(CODEGEN_FILES)
+	$(CXX) -MM $*.cc $(CXXFLAGS) > $*.d
+
+%.cc: %.lex
+	flex -o $@ $^
+
+%.cc %.h: %.y
+	bison --output=$*.cc --defines=$*.h $^
+
+
+ifneq ($(MAKECMDGOALS),clean)
+  -include $(call source_to_depend,$(SOURCES))
+endif