You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2015/07/22 16:30:51 UTC

trafficserver git commit: TS-3786 Missed one clang-format case :)

Repository: trafficserver
Updated Branches:
  refs/heads/master db4029a2f -> 6448a82b4


TS-3786 Missed one clang-format case :)


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/6448a82b
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/6448a82b
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/6448a82b

Branch: refs/heads/master
Commit: 6448a82b477745407d5efc1c982c9d4f5e8f2179
Parents: db4029a
Author: Leif Hedstrom <zw...@apache.org>
Authored: Wed Jul 22 09:30:05 2015 -0500
Committer: Leif Hedstrom <zw...@apache.org>
Committed: Wed Jul 22 09:30:05 2015 -0500

----------------------------------------------------------------------
 lib/raft/raft_impl.h | 1028 +++++++++++++++++++++++----------------------
 1 file changed, 515 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/6448a82b/lib/raft/raft_impl.h
----------------------------------------------------------------------
diff --git a/lib/raft/raft_impl.h b/lib/raft/raft_impl.h
index 56d5168..1ed1dd6 100644
--- a/lib/raft/raft_impl.h
+++ b/lib/raft/raft_impl.h
@@ -33,587 +33,589 @@
 
 #include "raft.h"
 
-  namespace raft
+namespace raft
 {
-  template <typename Server> class RaftImpl : public Raft<Server>
+template <typename Server> class RaftImpl : public Raft<Server>
+{
+public:
+  typedef typename Server::Message Message;
+  typedef typename Server::LogEntry LogEntry;
+  typedef typename Server::Config Config;
+
+  RaftImpl(Server *server, const ::std::string &node) : server_(server), node_(node) {}
+  ~RaftImpl() {}
+
+  virtual void
+  SetElectionTimeout(double timeout)
   {
-  public:
-    typedef typename Server::Message Message;
-    typedef typename Server::LogEntry LogEntry;
-    typedef typename Server::Config Config;
-
-    RaftImpl(Server *server, const ::std::string &node) : server_(server), node_(node) {}
-    ~RaftImpl() {}
-
-    virtual void
-    SetElectionTimeout(double timeout)
-    {
-      election_timeout_ = timeout;
-    }
+    election_timeout_ = timeout;
+  }
 
-    virtual void
-    Recover(const LogEntry &e)
-    {
-      if (!e.has_term()) {   // LogEntry from server.
-        if (e.has_index()) { // Summary log entry.
-          ProcessLogEntry(e, true);
-          Commit(true);
-        } else if (e.has_config()) {
-          config_.CopyFrom(e.config());
-          ConfigChanged();
-        }
-      } else { // LogEntry has passed through Raft.
-        if (e.term() > term_)
-          NewTerm(e.term(), e.leader(), true);
-        if (e.has_config_committed())
-          config_committed_ = e.config_committed();
-        if (e.has_data_committed())
-          data_committed_ = e.data_committed();
+  virtual void
+  Recover(const LogEntry &e)
+  {
+    if (!e.has_term()) {   // LogEntry from server.
+      if (e.has_index()) { // Summary log entry.
         ProcessLogEntry(e, true);
         Commit(true);
+      } else if (e.has_config()) {
+        config_.CopyFrom(e.config());
+        ConfigChanged();
       }
+    } else { // LogEntry has passed through Raft.
+      if (e.term() > term_)
+        NewTerm(e.term(), e.leader(), true);
+      if (e.has_config_committed())
+        config_committed_ = e.config_committed();
+      if (e.has_data_committed())
+        data_committed_ = e.data_committed();
+      ProcessLogEntry(e, true);
+      Commit(true);
     }
+  }
 
-    virtual void
-    Start(double now, int64_t seed)
-    {
-      last_heartbeat_ = now;
-      srand48_r(seed, &rand_);
+  virtual void
+  Start(double now, int64_t seed)
+  {
+    last_heartbeat_ = now;
+    srand48_r(seed, &rand_);
+    double r = 0.0;
+    drand48_r(&rand_, &r);
+    random_election_delay_ = election_timeout_ * r;
+    if (ConfigChanged())
+      NewTerm(term_ + 1, leader_, true);
+    else
+      vote_ = node_; // Conservatively assume we called a vote for ourself.
+    server_->ConfigChange(this, config_);
+    server_->LeaderChange(this, leader_);
+  }
+
+  virtual void
+  Tick(double now)
+  {
+    if (i_am_in_nodes() && !other_nodes_.empty() && now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
       double r = 0.0;
       drand48_r(&rand_, &r);
       random_election_delay_ = election_timeout_ * r;
-      if (ConfigChanged())
-        NewTerm(term_ + 1, leader_, true);
-      else
-        vote_ = node_; // Conservatively assume we called a vote for ourself.
-      server_->ConfigChange(this, config_);
-      server_->LeaderChange(this, leader_);
+      last_heartbeat_ = now;
+      VoteForMe();
+      return;
     }
-
-    virtual void
-    Tick(double now)
-    {
-      if (i_am_in_nodes() && !other_nodes_.empty() && now - last_heartbeat_ > election_timeout_ + random_election_delay_) {
-        double r = 0.0;
-        drand48_r(&rand_, &r);
-        random_election_delay_ = election_timeout_ * r;
-        last_heartbeat_ = now;
-        VoteForMe();
-        return;
-      }
-      // Send heartbeats at 1/4 of timeout to allow for lost
-      // packets/connections.
-      if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
-        last_heartbeat_sent_ = now;
-        ReplicateAll(true);
-      }
+    // Send heartbeats at 1/4 of timeout to allow for lost
+    // packets/connections.
+    if (i_am_leader() && now - last_heartbeat_sent_ > election_timeout_ / 4) {
+      last_heartbeat_sent_ = now;
+      ReplicateAll(true);
     }
+  }
 
-    virtual void
-    Propose(const LogEntry &e)
-    {
-      assert(i_am_leader());
-      LogEntry entry(e);
-      entry.set_term(term_);
-      entry.set_index(index_ + 1);
-      entry.set_previous_log_term(last_log_term_);
-      entry.set_previous_log_index(index_);
-      ProcessLogEntry(entry, false);
-      ReplicateAll(false);
-      Commit(false);
-    }
+  virtual void
+  Propose(const LogEntry &e)
+  {
+    assert(i_am_leader());
+    LogEntry entry(e);
+    entry.set_term(term_);
+    entry.set_index(index_ + 1);
+    entry.set_previous_log_term(last_log_term_);
+    entry.set_previous_log_index(index_);
+    ProcessLogEntry(entry, false);
+    ReplicateAll(false);
+    Commit(false);
+  }
 
-    virtual void
-    Run(double now, const Message &m)
-    {
-      if (m.term() >= term_)
-        seen_term_ = true;
-      if (m.term() < term_)
-        return; // Ignore messages from terms gone by.
-      if (m.term() > term_)
-        NewTerm(m.term(), m.leader(), false);
-      if (m.leader() != "" && leader_ != m.leader() && other_nodes_.count(m.from())) { // Only from nodes I acknowledge.
-        leader_ = m.leader();
-        server_->LeaderChange(this, leader_);
-      }
-      auto &n = node_state_[m.from()];
-      if (n.term != m.term()) {
-        n.term = m.term();
-        n.vote = "";
-      }
-      n.term = term_;
-      n.last_log_term = m.last_log_term();
-      n.last_log_index = m.last_log_index();
-      if (m.from() != leader_ || m.has_vote()) {
-        HandleAck(now, m, &n);
-        if (m.has_vote())
-          HandleVote(m, &n);
-        return;
-      }
-      last_heartbeat_ = now;
-      if (m.config_committed() > config_committed_ || m.data_committed() > data_committed_) {
-        config_committed_ = m.config_committed();
-        data_committed_ = m.data_committed();
-        WriteInternalLogEntry();
-      }
-      if (m.has_entry())
-        Ack(ProcessLogEntry(m.entry(), false));
-      else
-        Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
-      Commit(false);
+  virtual void
+  Run(double now, const Message &m)
+  {
+    if (m.term() >= term_)
+      seen_term_ = true;
+    if (m.term() < term_)
+      return; // Ignore messages from terms gone by.
+    if (m.term() > term_)
+      NewTerm(m.term(), m.leader(), false);
+    if (m.leader() != "" && leader_ != m.leader() && other_nodes_.count(m.from())) { // Only from nodes I acknowledge.
+      leader_ = m.leader();
+      server_->LeaderChange(this, leader_);
     }
-
-    virtual void
-    Snapshot(bool uncommitted, ::std::vector<LogEntry> *entries)
-    {
-      entries->clear();
-      LogEntry config_e;
-      config_e.set_term(config_.term());
-      config_e.set_index(config_.index());
-      config_e.set_vote(vote_);
-      config_e.set_data_committed(data_committed_);
-      config_e.set_config_committed(config_committed_);
-      config_e.mutable_config()->CopyFrom(config_);
-      entries->push_back(config_e);
-      if (pending_config_.has_term() && (!waiting_commits_.size() || // If it isn't in the waiting_commits.
-                                         waiting_commits_.front()->index() > pending_config_.index())) {
-        LogEntry pending_e;
-        pending_e.set_term(pending_config_.term());
-        pending_e.set_index(pending_config_.index());
-        pending_e.mutable_config()->CopyFrom(pending_config_);
-        entries->push_back(pending_e);
-      }
-      if (uncommitted)
-        for (auto &e : waiting_commits_)
-          entries->push_back(*e);
+    auto &n = node_state_[m.from()];
+    if (n.term != m.term()) {
+      n.term = m.term();
+      n.vote = "";
+    }
+    n.term = term_;
+    n.last_log_term = m.last_log_term();
+    n.last_log_index = m.last_log_index();
+    if (m.from() != leader_ || m.has_vote()) {
+      HandleAck(now, m, &n);
+      if (m.has_vote())
+        HandleVote(m, &n);
+      return;
     }
+    last_heartbeat_ = now;
+    if (m.config_committed() > config_committed_ || m.data_committed() > data_committed_) {
+      config_committed_ = m.config_committed();
+      data_committed_ = m.data_committed();
+      WriteInternalLogEntry();
+    }
+    if (m.has_entry())
+      Ack(ProcessLogEntry(m.entry(), false));
+    else
+      Ack(m.last_log_index() == index_ && m.last_log_term() == last_log_term_);
+    Commit(false);
+  }
 
-    virtual void
-    Stop()
-    {
-      Abdicate();
+  virtual void
+  Snapshot(bool uncommitted, ::std::vector<LogEntry> *entries)
+  {
+    entries->clear();
+    LogEntry config_e;
+    config_e.set_term(config_.term());
+    config_e.set_index(config_.index());
+    config_e.set_vote(vote_);
+    config_e.set_data_committed(data_committed_);
+    config_e.set_config_committed(config_committed_);
+    config_e.mutable_config()->CopyFrom(config_);
+    entries->push_back(config_e);
+    if (pending_config_.has_term() && (!waiting_commits_.size() || // If it isn't in the waiting_commits.
+                                       waiting_commits_.front()->index() > pending_config_.index())) {
+      LogEntry pending_e;
+      pending_e.set_term(pending_config_.term());
+      pending_e.set_index(pending_config_.index());
+      pending_e.mutable_config()->CopyFrom(pending_config_);
+      entries->push_back(pending_e);
     }
+    if (uncommitted)
+      for (auto &e : waiting_commits_)
+        entries->push_back(*e);
+  }
+
+  virtual void
+  Stop()
+  {
+    Abdicate();
+  }
+
+private:
+  struct NodeState {
+    int64_t term = -1;
+    int64_t sent_term = 0;
+    int64_t sent_index = 0;
+    int64_t last_log_term = -1;
+    int64_t last_log_index = -1;
+    double ack_received = -1.0e10;
+    ::std::string vote;
+  };
+
+  Message
+  InitializeMessage()
+  {
+    Message m;
+    m.set_term(term_);
+    m.set_last_log_term(last_log_term_);
+    m.set_last_log_index(index_);
+    m.set_from(node_);
+    m.set_leader(leader_);
+    m.set_data_committed(data_committed_);
+    m.set_config_committed(config_committed_);
+    return m;
+  }
 
-  private:
-    struct NodeState {
-      int64_t term = -1;
-      int64_t sent_term = 0;
-      int64_t sent_index = 0;
-      int64_t last_log_term = -1;
-      int64_t last_log_index = -1;
-      double ack_received = -1.0e10;
-      ::std::string vote;
-    };
-
-    Message
-    InitializeMessage()
-    {
-      Message m;
-      m.set_term(term_);
-      m.set_last_log_term(last_log_term_);
-      m.set_last_log_index(index_);
-      m.set_from(node_);
-      m.set_leader(leader_);
-      m.set_data_committed(data_committed_);
-      m.set_config_committed(config_committed_);
-      return m;
+  void
+  NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery)
+  {
+    vote_ = "";
+    term_ = term;
+    leader_ = new_leader;
+    waiting_commits_.clear();
+    if (!in_recovery) {
+      WriteInternalLogEntry();
+      server_->LeaderChange(this, leader_);
     }
+  }
 
-    void
-    NewTerm(int64_t term, const ::std::string new_leader, bool in_recovery)
-    {
-      vote_ = "";
-      term_ = term;
-      leader_ = new_leader;
+  void
+  VoteForMe()
+  {
+    if (seen_term_ || leader_ != "" || vote_ != node_) {
+      vote_ = node_;
+      term_++;
+      leader_ = "";
       waiting_commits_.clear();
-      if (!in_recovery) {
-        WriteInternalLogEntry();
-        server_->LeaderChange(this, leader_);
-      }
+      WriteInternalLogEntry();
+      server_->LeaderChange(this, leader_);
+      seen_term_ = false;
     }
+    Vote();
+  }
+
+  void
+  Vote()
+  {
+    Message m(InitializeMessage());
+    m.set_vote(vote_);
+    if (vote_ == node_)
+      SendToReplicas(m);
+    else
+      server_->SendMessage(this, vote_, m);
+  }
 
-    void
-    VoteForMe()
-    {
-      if (seen_term_ || leader_ != "" || vote_ != node_) {
-        vote_ = node_;
-        term_++;
-        leader_ = "";
-        waiting_commits_.clear();
+  void
+  HandleVote(const Message &m, NodeState *n)
+  {
+    n->vote = m.vote();
+    if (vote_.empty()) {       // I have not voted yet.
+      if (m.vote() == node_) { // Abdication.
+        VoteForMe();
+      } else if (m.last_log_term() >= last_log_term_ && m.last_log_index() >= index_) {
+        // Vote for candidate if it is at least as up to date as we are.
+        vote_ = m.vote();
+        WriteInternalLogEntry();
+        Vote();
+      }
+    } else if (vote_ == node_ && node_ == n->vote) {
+      int votes = 0;
+      for (auto &o : other_config_nodes_) {
+        auto &s = node_state_[o];
+        if (s.term == term_ && s.vote == node_)
+          votes++;
+      }
+      if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
+        leader_ = node_;
         WriteInternalLogEntry();
         server_->LeaderChange(this, leader_);
-        seen_term_ = false;
+        HeartBeat(); // Inform the others.
       }
-      Vote();
     }
+  }
 
-    void
-    Vote()
-    {
-      Message m(InitializeMessage());
-      m.set_vote(vote_);
-      if (vote_ == node_)
-        SendToReplicas(m);
-      else
-        server_->SendMessage(this, vote_, m);
+  void
+  Ack(bool ack)
+  {
+    Message m(InitializeMessage());
+    if (!ack) { // Reset local log state to last committed.
+      m.set_nack(true);
+      m.set_last_log_term(last_log_committed_term_);
+      m.set_last_log_index(last_log_committed_index_);
+      index_ = last_log_committed_index_;
+      last_log_term_ = last_log_committed_term_;
     }
+    server_->SendMessage(this, leader_, m);
+  }
 
-    void
-    HandleVote(const Message &m, NodeState *n)
-    {
-      n->vote = m.vote();
-      if (vote_.empty()) {       // I have not voted yet.
-        if (m.vote() == node_) { // Abdication.
-          VoteForMe();
-        } else if (m.last_log_term() >= last_log_term_ && m.last_log_index() >= index_) {
-          // Vote for candidate if it is at least as up to date as we are.
-          vote_ = m.vote();
-          WriteInternalLogEntry();
-          Vote();
-        }
-      } else if (vote_ == node_ && node_ == n->vote) {
-        int votes = 0;
-        for (auto &o : other_config_nodes_) {
-          auto &s = node_state_[o];
-          if (s.term == term_ && s.vote == node_)
-            votes++;
-        }
-        if (votes + 1 > (other_config_nodes_.size() + 1) / 2) {
-          leader_ = node_;
-          WriteInternalLogEntry();
-          server_->LeaderChange(this, leader_);
-          HeartBeat(); // Inform the others.
-        }
-      }
+  void
+  HandleAck(double now, const Message &m, NodeState *n)
+  {
+    n->ack_received = now;
+    if (m.nack()) {
+      n->sent_index = n->last_log_index;
+      n->sent_term = n->last_log_term;
+    } else if (i_am_leader()) {
+      int acks_needed = (other_nodes_.size() + 1) / 2;
+      for (auto &o : other_nodes_)
+        if (node_state_[o].ack_received >= last_heartbeat_sent_)
+          acks_needed--;
+      if (acks_needed <= 0)
+        last_heartbeat_ = now;
+      UpdateCommitted();
     }
+  }
 
-    void
-    Ack(bool ack)
-    {
-      Message m(InitializeMessage());
-      if (!ack) { // Reset local log state to last committed.
-        m.set_nack(true);
-        m.set_last_log_term(last_log_committed_term_);
-        m.set_last_log_index(last_log_committed_index_);
-        index_ = last_log_committed_index_;
-        last_log_term_ = last_log_committed_term_;
-      }
-      server_->SendMessage(this, leader_, m);
-    }
+  void
+  HeartBeat()
+  {
+    Message m(InitializeMessage());
+    SendToReplicas(m);
+  }
+
+  void
+  SendToReplicas(const Message &m)
+  {
+    for (auto &n : replicas_)
+      server_->SendMessage(this, n, m);
+  }
 
-    void
-    HandleAck(double now, const Message &m, NodeState *n)
-    {
-      n->ack_received = now;
-      if (m.nack()) {
-        n->sent_index = n->last_log_index;
-        n->sent_term = n->last_log_term;
-      } else if (i_am_leader()) {
-        int acks_needed = (other_nodes_.size() + 1) / 2;
-        for (auto &o : other_nodes_)
-          if (node_state_[o].ack_received >= last_heartbeat_sent_)
-            acks_needed--;
-        if (acks_needed <= 0)
-          last_heartbeat_ = now;
-        UpdateCommitted();
+  void
+  Abdicate()
+  {
+    if (!i_am_leader())
+      return;
+    // Attempt to pass leadership to a worthy successor.
+    const ::std::string *best_node = nullptr;
+    NodeState *best = nullptr;
+    for (auto &n : other_nodes_) {
+      auto &s = node_state_[n];
+      if (!best || (s.last_log_term > best->last_log_term ||
+                    (s.last_log_term == best->last_log_term && s.last_log_index > best->last_log_index))) {
+        best_node = &n;
+        best = &s;
       }
     }
-
-    void
-    HeartBeat()
-    {
+    if (best_node) {
+      term_++;
+      leader_ = "";
+      vote_ = *best_node;
+      WriteInternalLogEntry();
       Message m(InitializeMessage());
-      SendToReplicas(m);
-    }
-
-    void
-    SendToReplicas(const Message &m)
-    {
-      for (auto &n : replicas_)
-        server_->SendMessage(this, n, m);
+      m.set_vote(vote_);
+      server_->SendMessage(this, vote_, m);
     }
+  }
 
-    void
-    Abdicate()
-    {
-      if (!i_am_leader())
-        return;
-      // Attempt to pass leadership to a worthy successor.
-      const ::std::string *best_node = nullptr;
-      NodeState *best = nullptr;
-      for (auto &n : other_nodes_) {
-        auto &s = node_state_[n];
-        if (!best || (s.last_log_term > best->last_log_term ||
-                      (s.last_log_term == best->last_log_term && s.last_log_index > best->last_log_index))) {
-          best_node = &n;
-          best = &s;
-        }
-      }
-      if (best_node) {
-        term_++;
-        leader_ = "";
-        vote_ = *best_node;
-        WriteInternalLogEntry();
-        Message m(InitializeMessage());
-        m.set_vote(vote_);
-        server_->SendMessage(this, vote_, m);
-      }
-    }
+  void
+  WriteInternalLogEntry()
+  {
+    LogEntry e;
+    e.set_term(term_);
+    e.set_leader(leader_);
+    e.set_vote(vote_);
+    e.set_data_committed(data_committed_);
+    e.set_config_committed(config_committed_);
+    server_->WriteLogEntry(this, e);
+  }
 
-    void
-    WriteInternalLogEntry()
-    {
-      LogEntry e;
-      e.set_term(term_);
-      e.set_leader(leader_);
-      e.set_vote(vote_);
-      e.set_data_committed(data_committed_);
-      e.set_config_committed(config_committed_);
-      server_->WriteLogEntry(this, e);
+  bool
+  ProcessLogEntry(const LogEntry &e, bool in_recovery)
+  {
+    if (e.has_config()) {
+      pending_config_.CopyFrom(e.config());
+      pending_config_.set_term(e.term());
+      pending_config_.set_index(e.index());
+      ConfigChanged();
     }
-
-    bool
-    ProcessLogEntry(const LogEntry &e, bool in_recovery)
-    {
-      if (e.has_config()) {
-        pending_config_.CopyFrom(e.config());
-        pending_config_.set_term(e.term());
-        pending_config_.set_index(e.index());
-        ConfigChanged();
+    if (e.has_index()) { // Not an internal entry.
+      std::unique_ptr<LogEntry> entry(new LogEntry(e));
+      if (e.index() <= index_)
+        return true;            // Already seen this.
+      if (!entry->has_term()) { // Summary, fill in the internal bits.
+        entry->set_term(term_);
+        index_ = entry->index() - 1; // Summary need not have an extent().
+        entry->set_previous_log_term(last_log_term_);
+        entry->set_previous_log_index(index_);
       }
-      if (e.has_index()) { // Not an internal entry.
-        std::unique_ptr<LogEntry> entry(new LogEntry(e));
-        if (e.index() <= index_)
-          return true;            // Already seen this.
-        if (!entry->has_term()) { // Summary, fill in the internal bits.
-          entry->set_term(term_);
-          index_ = entry->index() - 1; // Summary need not have an extent().
-          entry->set_previous_log_term(last_log_term_);
-          entry->set_previous_log_index(index_);
-        }
-        if (e.term() < last_log_term_)
-          return true; // Already seen this.
-        if (e.term() == last_log_term_ && e.index() <= index_)
-          return true;
-        if ((entry->previous_log_term() != last_log_term_ || entry->previous_log_index() != index_))
-          return false; // Out of sequence.
-        if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
-          return false; // Out of sequence.
-        last_log_term_ = entry->term();
-        index_ = entry->index() + entry->extent();
-        if (!in_recovery && i_am_leader()) {
-          if (!other_nodes_.size())
-            data_committed_ = index_;
-          if (!other_config_nodes_.size())
-            config_committed_ = index_;
-        }
-        entry->set_data_committed(data_committed_);
-        entry->set_config_committed(config_committed_);
-        if (!in_recovery)
-          server_->WriteLogEntry(this, *entry);
-        waiting_commits_.emplace_back(entry.release());
+      if (e.term() < last_log_term_)
+        return true; // Already seen this.
+      if (e.term() == last_log_term_ && e.index() <= index_)
+        return true;
+      if ((entry->previous_log_term() != last_log_term_ || entry->previous_log_index() != index_))
+        return false; // Out of sequence.
+      if (last_log_term_ == entry->term() && entry->index() != index_ + 1)
+        return false; // Out of sequence.
+      last_log_term_ = entry->term();
+      index_ = entry->index() + entry->extent();
+      if (!in_recovery && i_am_leader()) {
+        if (!other_nodes_.size())
+          data_committed_ = index_;
+        if (!other_config_nodes_.size())
+          config_committed_ = index_;
       }
-      return true;
+      entry->set_data_committed(data_committed_);
+      entry->set_config_committed(config_committed_);
+      if (!in_recovery)
+        server_->WriteLogEntry(this, *entry);
+      waiting_commits_.emplace_back(entry.release());
     }
+    return true;
+  }
 
-    int
-    MajorityIndex(const ::std::set< ::std::string> &other)
-    {
-      ::std::vector<int64_t> indices(1, index_);
-      for (auto &o : other)
-        indices.push_back(node_state_[o].last_log_index);
-      sort(indices.begin(), indices.end());
-      return indices[indices.size() / 2];
-    }
+  int
+  MajorityIndex(const ::std::set< ::std::string> &other)
+  {
+    ::std::vector<int64_t> indices(1, index_);
+    for (auto &o : other)
+      indices.push_back(node_state_[o].last_log_index);
+    sort(indices.begin(), indices.end());
+    return indices[indices.size() / 2];
+  }
 
-    void
-    UpdateCommitted()
-    {
-      int i = MajorityIndex(other_nodes_);
-      if (i > data_committed_) {
-        data_committed_ = i;
+  void
+  UpdateCommitted()
+  {
+    int i = MajorityIndex(other_nodes_);
+    if (i > data_committed_) {
+      data_committed_ = i;
+      WriteInternalLogEntry();
+      Commit(false);
+      HeartBeat();
+    }
+    if (pending_config_.has_term()) { // If a pending configuration change.
+      int ci = MajorityIndex(other_config_nodes_);
+      // config_committed must be <= data_committed, so the new
+      // configuration must also concur with the new data_committed.
+      if (i == ci && ci > config_committed_) {
+        config_committed_ = ci;
         WriteInternalLogEntry();
         Commit(false);
         HeartBeat();
-      }
-      if (pending_config_.has_term()) { // If a pending configuration change.
-        int ci = MajorityIndex(other_config_nodes_);
-        // config_committed must be <= data_committed, so the new
-        // configuration must also concur with the new data_committed.
-        if (i == ci && ci > config_committed_) {
-          config_committed_ = ci;
-          WriteInternalLogEntry();
-          Commit(false);
-          HeartBeat();
-          if (!i_am_leader() && other_nodes_.size() > 1)
-            Abdicate();
-        }
+        if (!i_am_leader() && other_nodes_.size() > 1)
+          Abdicate();
       }
     }
+  }
 
-    void
-    Commit(bool in_recovery)
-    {
-      ::std::vector<std::unique_ptr<LogEntry> > pending;
-      while (!waiting_commits_.empty() && waiting_commits_.front()->index() <= data_committed_) {
-        auto &e = waiting_commits_.front();
-        while (!pending.empty() && e->index() <= pending.back()->index())
-          pending.pop_back();
-        pending.emplace_back(e.release());
-        waiting_commits_.pop_front();
-      }
-      for (auto &e : pending) {
-        server_->CommitLogEntry(this, *e);
-        last_log_committed_term_ = e->term();
-        last_log_committed_index_ = e->index();
-      }
-      CommitConfig(in_recovery);
+  void
+  Commit(bool in_recovery)
+  {
+    ::std::vector<std::unique_ptr<LogEntry> > pending;
+    while (!waiting_commits_.empty() && waiting_commits_.front()->index() <= data_committed_) {
+      auto &e = waiting_commits_.front();
+      while (!pending.empty() && e->index() <= pending.back()->index())
+        pending.pop_back();
+      pending.emplace_back(e.release());
+      waiting_commits_.pop_front();
+    }
+    for (auto &e : pending) {
+      server_->CommitLogEntry(this, *e);
+      last_log_committed_term_ = e->term();
+      last_log_committed_index_ = e->index();
     }
+    CommitConfig(in_recovery);
+  }
 
-    void
-    CommitConfig(bool in_recovery)
-    {
-      if (pending_config_.has_term() && pending_config_.term() == term_ && pending_config_.index() <= config_committed_) {
-        config_.Swap(&pending_config_);
-        pending_config_.Clear();
-        server_->ConfigChange(this, config_);
-        if (ConfigChanged()) {
-          NewTerm(term_ + 1, leader_, in_recovery);
-          if (!in_recovery)
-            HeartBeat();
-        }
+  void
+  CommitConfig(bool in_recovery)
+  {
+    if (pending_config_.has_term() && pending_config_.term() == term_ && pending_config_.index() <= config_committed_) {
+      config_.Swap(&pending_config_);
+      pending_config_.Clear();
+      server_->ConfigChange(this, config_);
+      if (ConfigChanged()) {
+        NewTerm(term_ + 1, leader_, in_recovery);
+        if (!in_recovery)
+          HeartBeat();
       }
     }
+  }
 
-    bool
-    ConfigChanged()
-    { // Returns: true if the leader_ changed.
-      other_nodes_.clear();
-      other_config_nodes_.clear();
-      replicas_.clear();
-      for (auto &n : config_.node())
-        if (n != node_) {
-          other_nodes_.insert(n);
-          other_config_nodes_.insert(n);
-        }
-      for (auto &n : pending_config_.node())
-        if (n != node_)
-          other_config_nodes_.insert(n);
-      replicas_.insert(config_.replica().begin(), config_.replica().end());
-      replicas_.insert(pending_config_.replica().begin(), pending_config_.replica().end());
-      replicas_.insert(other_nodes_.begin(), other_nodes_.end());
-      replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
-      ::std::string old_leader = leader_;
-      if (!other_nodes_.size())
-        leader_ = node_;
-      else if (!i_am_in_nodes() && other_nodes_.size() == 1)
-        leader_ = *other_nodes_.begin();
-      else if (leader_ == node_ && !i_am_in_nodes())
-        leader_ = "";
-      return leader_ != old_leader;
-    }
+  bool
+  ConfigChanged()
+  { // Returns: true if the leader_ changed.
+    other_nodes_.clear();
+    other_config_nodes_.clear();
+    replicas_.clear();
+    for (auto &n : config_.node())
+      if (n != node_) {
+        other_nodes_.insert(n);
+        other_config_nodes_.insert(n);
+      }
+    for (auto &n : pending_config_.node())
+      if (n != node_)
+        other_config_nodes_.insert(n);
+    replicas_.insert(config_.replica().begin(), config_.replica().end());
+    replicas_.insert(pending_config_.replica().begin(), pending_config_.replica().end());
+    replicas_.insert(other_nodes_.begin(), other_nodes_.end());
+    replicas_.insert(other_config_nodes_.begin(), other_config_nodes_.end());
+    ::std::string old_leader = leader_;
+    if (!other_nodes_.size())
+      leader_ = node_;
+    else if (!i_am_in_nodes() && other_nodes_.size() == 1)
+      leader_ = *other_nodes_.begin();
+    else if (leader_ == node_ && !i_am_in_nodes())
+      leader_ = "";
+    return leader_ != old_leader;
+  }
 
-    bool
-    SendReplicationMessage(const ::std::string &n, const LogEntry &entry, NodeState *s)
-    {
-      Message m(InitializeMessage());
-      m.mutable_entry()->CopyFrom(entry);
-      if (!server_->SendMessage(this, n, m))
-        return false;
-      s->sent_index = entry.index() + entry.extent();
-      s->sent_term = entry.term();
-      return true;
-    }
+  bool
+  SendReplicationMessage(const ::std::string &n, const LogEntry &entry, NodeState *s)
+  {
+    Message m(InitializeMessage());
+    m.mutable_entry()->CopyFrom(entry);
+    if (!server_->SendMessage(this, n, m))
+      return false;
+    s->sent_index = entry.index() + entry.extent();
+    s->sent_term = entry.term();
+    return true;
+  }
 
-    void
-    Replicate(const ::std::string &n, bool heartbeat)
-    {
-      bool sent = false;
-      auto &s = node_state_[n];
-      if (s.term == term_) { // Replica has acknowledged me as leader.
-        int64_t end = index_;
-        if (waiting_commits_.size())
-          end = waiting_commits_.front()->index() - 1;
-        while (s.sent_index < end) { // Get from server.
-          LogEntry entry;
-          server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
-          if (!entry.has_term()) {
-            // A summary log entry from the server with historical information.
-            entry.set_term(last_log_term_);
-            entry.set_index(s.sent_index + 1);
-          }
-          entry.set_previous_log_term(s.sent_term);
-          entry.set_previous_log_index(s.sent_index);
-          assert(entry.index() > s.sent_index);
-          int64_t x = s.sent_index;
-          if (!SendReplicationMessage(n, entry, &s))
-            break;
-          assert(s.sent_index > x);
-          sent = true;
-        }
-        for (auto &e : waiting_commits_) {
-          if (e->index() <= s.sent_index) // Skip those already sent.
-            continue;
-          if (!SendReplicationMessage(n, *e, &s))
-            break;
-          sent = true;
+  void
+  Replicate(const ::std::string &n, bool heartbeat)
+  {
+    bool sent = false;
+    auto &s = node_state_[n];
+    if (s.term == term_) { // Replica has acknowledged me as leader.
+      int64_t end = index_;
+      if (waiting_commits_.size())
+        end = waiting_commits_.front()->index() - 1;
+      while (s.sent_index < end) { // Get from server.
+        LogEntry entry;
+        server_->GetLogEntry(this, s.sent_term, s.sent_index + 1, end, &entry);
+        if (!entry.has_term()) {
+          // A summary log entry from the server with historical information.
+          entry.set_term(last_log_term_);
+          entry.set_index(s.sent_index + 1);
         }
+        entry.set_previous_log_term(s.sent_term);
+        entry.set_previous_log_index(s.sent_index);
+        assert(entry.index() > s.sent_index);
+        int64_t x = s.sent_index;
+        if (!SendReplicationMessage(n, entry, &s))
+          break;
+        assert(s.sent_index > x);
+        sent = true;
       }
-      if (heartbeat && !sent) {
-        Message m(InitializeMessage());
-        server_->SendMessage(this, n, m);
+      for (auto &e : waiting_commits_) {
+        if (e->index() <= s.sent_index) // Skip those already sent.
+          continue;
+        if (!SendReplicationMessage(n, *e, &s))
+          break;
+        sent = true;
       }
     }
-
-    void
-    ReplicateAll(bool heartbeat)
-    {
-      for (auto &n : replicas_)
-        Replicate(n, heartbeat);
-    }
-
-    bool
-    i_am_leader()
-    {
-      return node_ == leader_;
-    }
-    bool
-    i_am_in_nodes()
-    {
-      auto &n = config_.node();
-      return std::find(n.begin(), n.end(), node_) != n.end();
+    if (heartbeat && !sent) {
+      Message m(InitializeMessage());
+      server_->SendMessage(this, n, m);
     }
+  }
 
-    Server *const server_;
-    struct drand48_data rand_;
-    const ::std::string node_;
-    int64_t term_ = 0;           // Current term.
-    int64_t last_log_term_ = -1; // Term of last log entry this node has.
-    int64_t index_ = 0;          // Index of last log entry this node has.
-    int64_t config_committed_ = -1;
-    int64_t data_committed_ = -1;
-    int64_t last_log_committed_index_ = -1;
-    int64_t last_log_committed_term_ = -1;
-    double election_timeout_ = 1.0;
-    double last_heartbeat_ = -1.0e10;
-    double last_heartbeat_sent_ = -1.0e10;
-    double random_election_delay_ = 0.0;
-    ::std::string leader_; // The current leader.  "" if there is no leader.
-    ::std::string vote_;   // My vote this term.
-    Config config_;
-    Config pending_config_;
-    ::std::map< ::std::string, NodeState> node_state_;
-    ::std::deque<std::unique_ptr<LogEntry> > waiting_commits_;
-    bool seen_term_ = true;
-    // Cached values.
-    ::std::set< ::std::string> other_nodes_;        // Nodes required for consensus on log entries.
-    ::std::set< ::std::string> other_config_nodes_; // Nodes required for config changes.
-    ::std::set< ::std::string> replicas_;           // All nodes receiving the replication stream.
-  };
+  void
+  ReplicateAll(bool heartbeat)
+  {
+    for (auto &n : replicas_)
+      Replicate(n, heartbeat);
+  }
 
-  template <typename Server> Raft<Server> *NewRaft(Server * server, const ::std::string &node)
+  bool
+  i_am_leader()
   {
-    return new RaftImpl<Server>(server, node);
+    return node_ == leader_;
   }
+  bool
+  i_am_in_nodes()
+  {
+    auto &n = config_.node();
+    return std::find(n.begin(), n.end(), node_) != n.end();
+  }
+
+  Server *const server_;
+  struct drand48_data rand_;
+  const ::std::string node_;
+  int64_t term_ = 0;           // Current term.
+  int64_t last_log_term_ = -1; // Term of last log entry this node has.
+  int64_t index_ = 0;          // Index of last log entry this node has.
+  int64_t config_committed_ = -1;
+  int64_t data_committed_ = -1;
+  int64_t last_log_committed_index_ = -1;
+  int64_t last_log_committed_term_ = -1;
+  double election_timeout_ = 1.0;
+  double last_heartbeat_ = -1.0e10;
+  double last_heartbeat_sent_ = -1.0e10;
+  double random_election_delay_ = 0.0;
+  ::std::string leader_; // The current leader.  "" if there is no leader.
+  ::std::string vote_;   // My vote this term.
+  Config config_;
+  Config pending_config_;
+  ::std::map< ::std::string, NodeState> node_state_;
+  ::std::deque<std::unique_ptr<LogEntry> > waiting_commits_;
+  bool seen_term_ = true;
+  // Cached values.
+  ::std::set< ::std::string> other_nodes_;        // Nodes required for consensus on log entries.
+  ::std::set< ::std::string> other_config_nodes_; // Nodes required for config changes.
+  ::std::set< ::std::string> replicas_;           // All nodes receiving the replication stream.
+};
+
+template <typename Server>
+Raft<Server> *
+NewRaft(Server *server, const ::std::string &node)
+{
+  return new RaftImpl<Server>(server, node);
+}
 } // namespace raft
 #endif // CONSENSUS_IMPL_H_