You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:22 UTC

[02/10] git commit: Decoupled replicated log coordinator logic and made it asynchronous.

Decoupled replicated log coordinator logic and made it asynchronous.

This is the first patch of a series of patches that implement a
catch-up mechanism for replicated log. See the following ticket for
more details: https://issues.apache.org/jira/browse/MESOS-736.

Here is a brief summary of this patch: (Sorry for the fact that we are
not able to break it into smaller patches :().

1) Pulled the original Coordinator logic out and divides it into
several Paxos phases (see src/log/consensus.hpp). Instead of using a
blocking semantics, we implemented all the logics asynchronously.

2) In order to ensure the liveness of a catch-uper, we implemented a
retry logic by bumping the proposal number. This also requires us to
slightly change the existing replica protocol.

3) Made the "fill" operation independent of the underlying
replica. Instead, introduced a catchup (see src/log/catchup.hpp)
function to make sure the underlying local replica has learned each
write.

4) Modified the log tests to adapt to the new semantics (see (3)
above).

This is a joint work with Yan Xu.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/14631


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/19ad88b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/19ad88b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/19ad88b7

Branch: refs/heads/master
Commit: 19ad88b7c45164c1272001493bdd176d80a88b91
Parents: 2ff5308
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:51:45 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:51:45 2014 -0800

----------------------------------------------------------------------
 src/Makefile.am         |  17 +-
 src/log/catchup.cpp     | 286 +++++++++++++++++
 src/log/catchup.hpp     |  54 ++++
 src/log/consensus.cpp   | 711 +++++++++++++++++++++++++++++++++++++++++++
 src/log/consensus.hpp   | 136 +++++++++
 src/log/coordinator.cpp | 472 ++++++----------------------
 src/log/coordinator.hpp |  58 +---
 src/log/log.hpp         |  69 ++---
 src/log/network.hpp     |  57 ++--
 src/log/replica.cpp     | 351 +++++++++++----------
 src/log/replica.hpp     |  25 +-
 src/messages/log.proto  |  91 +++---
 src/tests/log_tests.cpp | 420 +++++++++++++------------
 13 files changed, 1866 insertions(+), 881 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index cf0c8c6..17fbf83 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -297,9 +297,20 @@ libmesos_no_3rdparty_la_LIBADD += libbuild.la
 # Convenience library for building the replicated log in order to
 # include the leveldb headers.
 noinst_LTLIBRARIES += liblog.la
-liblog_la_SOURCES = log/coordinator.cpp log/replica.cpp
-liblog_la_SOURCES += log/coordinator.hpp log/replica.hpp log/log.hpp	\
-  log/network.hpp messages/log.hpp messages/log.proto
+liblog_la_SOURCES =							\
+  log/catchup.cpp							\
+  log/consensus.cpp							\
+  log/coordinator.cpp							\
+  log/replica.cpp
+liblog_la_SOURCES +=							\
+  log/catchup.hpp							\
+  log/consensus.hpp							\
+  log/coordinator.hpp							\
+  log/replica.hpp							\
+  log/log.hpp								\
+  log/network.hpp							\
+  messages/log.hpp							\
+  messages/log.proto
 nodist_liblog_la_SOURCES = $(LOG_PROTOS)
 liblog_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
new file mode 100644
index 0000000..5825eae
--- /dev/null
+++ b/src/log/catchup.cpp
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+
+#include <process/collect.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/stringify.hpp>
+
+#include "log/catchup.hpp"
+#include "log/consensus.hpp"
+
+#include "messages/log.hpp"
+
+using namespace process;
+
+using std::list;
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class CatchUpProcess : public Process<CatchUpProcess>
+{
+public:
+  CatchUpProcess(
+      size_t _quorum,
+      const Shared<Replica>& _replica,
+      const Shared<Network>& _network,
+      uint64_t _proposal,
+      uint64_t _position)
+    : ProcessBase(ID::generate("log-catch-up")),
+      quorum(_quorum),
+      replica(_replica),
+      network(_network),
+      position(_position),
+      proposal(_proposal) {}
+
+  virtual ~CatchUpProcess() {}
+
+  Future<uint64_t> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    check();
+  }
+
+  virtual void finalize()
+  {
+    checking.discard();
+    filling.discard();
+  }
+
+private:
+  void check()
+  {
+    checking = replica->missing(position);
+    checking.onAny(defer(self(), &Self::checked));
+  }
+
+  void checked()
+  {
+    // The future 'checking' can only be discarded in 'finalize'.
+    CHECK(!checking.isDiscarded());
+
+    if (checking.isFailed()) {
+      promise.fail("Failed to get missing positions: " + checking.failure());
+      terminate(self());
+    } else if (!checking.get()) {
+      // The position has been learned.
+      promise.set(proposal);
+      terminate(self());
+    } else {
+      // Still missing, try to fill it.
+      fill();
+    }
+  }
+
+  void fill()
+  {
+    filling = log::fill(quorum, network, proposal, position);
+    filling.onAny(defer(self(), &Self::filled));
+  }
+
+  void filled()
+  {
+    // The future 'filling' can only be discarded in 'finalize'.
+    CHECK(!filling.isDiscarded());
+
+    if (filling.isFailed()) {
+      promise.fail("Failed to fill missing position: " + filling.failure());
+      terminate(self());
+    } else {
+      // Update the proposal number so that we can save a proposal
+      // number bump round trip if we need to invoke fill again.
+      CHECK(filling.get().promised() >= proposal);
+      proposal = filling.get().promised();
+
+      check();
+    }
+  }
+
+  const size_t quorum;
+  const Shared<Replica> replica;
+  const Shared<Network> network;
+  const uint64_t position;
+
+  uint64_t proposal;
+
+  process::Promise<uint64_t> promise;
+  Future<bool> checking;
+  Future<Action> filling;
+};
+
+
+// Catches-up a single log position in the local replica. This
+// function returns the highest proposal number seen. The returned
+// proposal number can be used to save extra proposal number bumps.
+static Future<uint64_t> catchup(
+    size_t quorum,
+    const Shared<Replica>& replica,
+    const Shared<Network>& network,
+    uint64_t proposal,
+    uint64_t position)
+{
+  CatchUpProcess* process =
+    new CatchUpProcess(
+        quorum,
+        replica,
+        network,
+        proposal,
+        position);
+
+  Future<uint64_t> future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+
+// TODO(jieyu): Our current implementation catches-up each position in
+// the set sequentially. In the future, we may want to parallelize it
+// to improve the performance. Also, we may want to implement rate
+// control here so that we don't saturate the network or disk.
+class BulkCatchUpProcess : public Process<BulkCatchUpProcess>
+{
+public:
+  BulkCatchUpProcess(
+      size_t _quorum,
+      const Shared<Replica>& _replica,
+      const Shared<Network>& _network,
+      uint64_t _proposal,
+      const set<uint64_t>& _positions)
+    : ProcessBase(ID::generate("log-bulk-catch-up")),
+      quorum(_quorum),
+      replica(_replica),
+      network(_network),
+      positions(_positions),
+      proposal(_proposal) {}
+
+  virtual ~BulkCatchUpProcess() {}
+
+  Future<Nothing> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    // Catch-up each position in the set sequentially.
+    it = positions.begin();
+
+    catchup();
+  }
+
+  virtual void finalize()
+  {
+    catching.discard();
+  }
+
+private:
+  void catchup()
+  {
+    if (it == positions.end()) {
+      promise.set(Nothing());
+      terminate(self());
+      return;
+    }
+
+    // Store the future so that we can discard it if the user wants to
+    // cancel the catch-up operation.
+    catching = log::catchup(quorum, replica, network, proposal, *it);
+    catching.onAny(defer(self(), &Self::caughtup));
+  }
+
+  void caughtup()
+  {
+    // No one can discard the future 'catching' except the 'finalize'.
+    CHECK(!catching.isDiscarded());
+
+    if (catching.isFailed()) {
+      promise.fail(
+          "Failed to catch-up position " + stringify(*it) +
+          ": " + catching.failure());
+      terminate(self());
+      return;
+    }
+
+    ++it;
+
+    // The single position catch-up function: 'log::catchup' will
+    // return the highest proposal number seen so far. We use this
+    // proposal number for the next 'catchup' as it is highly likely
+    // that this number is high enough, saving potentially unnecessary
+    // proposal number bumps.
+    proposal = catching.get();
+
+    catchup();
+  }
+
+  const size_t quorum;
+  const Shared<Replica> replica;
+  const Shared<Network> network;
+  const set<uint64_t> positions;
+
+  uint64_t proposal;
+  set<uint64_t>::iterator it;
+
+  process::Promise<Nothing> promise;
+  Future<uint64_t> catching;
+};
+
+
+/////////////////////////////////////////////////
+// Public interfaces below.
+/////////////////////////////////////////////////
+
+
+Future<Nothing> catchup(
+    size_t quorum,
+    const Shared<Replica>& replica,
+    const Shared<Network>& network,
+    uint64_t proposal,
+    const set<uint64_t>& positions)
+{
+  BulkCatchUpProcess* process =
+    new BulkCatchUpProcess(
+        quorum,
+        replica,
+        network,
+        proposal,
+        positions);
+
+  Future<Nothing> future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/catchup.hpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp
new file mode 100644
index 0000000..3652830
--- /dev/null
+++ b/src/log/catchup.hpp
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_CATCHUP_HPP__
+#define __LOG_CATCHUP_HPP__
+
+#include <stdint.h>
+
+#include <set>
+
+#include <process/future.hpp>
+#include <process/shared.hpp>
+
+#include <stout/nothing.hpp>
+
+#include "log/network.hpp"
+#include "log/replica.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Catches-up a set of log positions in the local replica. The user of
+// this function can provide a hint on the proposal number that will
+// be used for Paxos. This could potentially save us a few Paxos
+// rounds. However, if the user has no idea what proposal number to
+// use, he can just use an arbitrary proposal number (e.g., 0).
+extern process::Future<Nothing> catchup(
+    size_t quorum,
+    const process::Shared<Replica>& replica,
+    const process::Shared<Network>& network,
+    uint64_t proposal,
+    const std::set<uint64_t>& positions);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_CATCHUP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/consensus.cpp
----------------------------------------------------------------------
diff --git a/src/log/consensus.cpp b/src/log/consensus.cpp
new file mode 100644
index 0000000..5eb90e7
--- /dev/null
+++ b/src/log/consensus.cpp
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/foreach.hpp>
+
+#include "log/consensus.hpp"
+#include "log/replica.hpp"
+
+using namespace process;
+
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class ExplicitPromiseProcess : public Process<ExplicitPromiseProcess>
+{
+public:
+  ExplicitPromiseProcess(
+      size_t _quorum,
+      const Shared<Network>& _network,
+      uint64_t _proposal,
+      uint64_t _position)
+    : ProcessBase(ID::generate("log-explicit-promise")),
+      quorum(_quorum),
+      network(_network),
+      proposal(_proposal),
+      position(_position),
+      responsesReceived(0) {}
+
+  virtual ~ExplicitPromiseProcess() {}
+
+  Future<PromiseResponse> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    request.set_proposal(proposal);
+    request.set_position(position);
+
+    network->broadcast(protocol::promise, request)
+      .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+  }
+
+  virtual void finalize()
+  {
+    // This process will be terminated when we get responses from a
+    // quorum of replicas. In that case, we no longer care about
+    // responses from other replicas, thus discarding them here.
+    discard(responses);
+  }
+
+private:
+  void broadcasted(const Future<set<Future<PromiseResponse> > >& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to broadcast explicit promise request: " + future.failure() :
+          "Not expecting discarded future");
+      terminate(self());
+      return;
+    }
+
+    responses = future.get();
+    foreach (const Future<PromiseResponse>& response, responses) {
+      response.onReady(defer(self(), &Self::received, lambda::_1));
+    }
+  }
+
+  void received(const PromiseResponse& response)
+  {
+    responsesReceived++;
+
+    if (!response.okay()) {
+      // Failed to get the promise from a replica for this position
+      // because it has been promised to a proposer with a higher
+      // proposal number. The 'proposal' field in the response
+      // specifies the proposal number. It is found to be larger than
+      // the proposal number used in this phase.
+      if (highestNackProposal.isNone() ||
+          highestNackProposal.get() < response.proposal()) {
+        highestNackProposal = response.proposal();
+      }
+    } else if (highestNackProposal.isSome()) {
+      // We still want to wait for more potential NACK responses so we
+      // can return the highest proposal number seen but we don't care
+      // about any more ACK responses.
+    } else {
+      // The position has been promised to us so the 'proposal' field
+      // should match the proposal we sent in the request.
+      CHECK_EQ(response.proposal(), request.proposal());
+
+      if (response.has_action()) {
+        CHECK_EQ(response.action().position(), position);
+        if (response.action().has_learned() && response.action().learned()) {
+          // Received a learned action. Note that there is no checking
+          // that we get the _same_ learned action in the event we get
+          // multiple responses with learned actions, we just take the
+          // "first". In fact, there is a specific instance in which
+          // learned actions will NOT be the same! In this instance,
+          // one replica may return that the action is a learned no-op
+          // because it knows the position has been truncated while
+          // another replica (that hasn't learned the truncation yet)
+          // might return the actual action at this position. Picking
+          // either action is _correct_, since eventually we know this
+          // position will be truncated. Fun!
+          promise.set(response);
+
+          // The remaining responses will be discarded in 'finalize'.
+          terminate(self());
+          return;
+        } else if (response.action().has_performed()) {
+          // An action has already been performed in this position, we
+          // need to save the action with the highest proposal number.
+          if (highestAckAction.isNone() ||
+              (highestAckAction.get().performed() <
+               response.action().performed())) {
+            highestAckAction = response.action();
+          }
+        } else {
+          // Received a response for a position that had previously
+          // been promised to some other proposer but an action had
+          // not been performed or learned. The position is now
+          // promised to us. No need to do anything here.
+        }
+      } else {
+        // Received a response without an action associated with. This
+        // is the case where this proposer is this first one who asks
+        // promise for this log position.
+        CHECK(response.has_position());
+        CHECK_EQ(response.position(), position);
+      }
+    }
+
+    if (responsesReceived >= quorum) {
+      // A quorum of replicas have replied.
+      PromiseResponse result;
+
+      if (highestNackProposal.isSome()) {
+        result.set_okay(false);
+        result.set_proposal(highestNackProposal.get());
+      } else {
+        result.set_okay(true);
+        if (highestAckAction.isSome()) {
+          result.mutable_action()->CopyFrom(highestAckAction.get());
+        }
+      }
+
+      promise.set(result);
+      terminate(self());
+    }
+  }
+
+  const size_t quorum;
+  const Shared<Network> network;
+  const uint64_t proposal;
+  const uint64_t position;
+
+  PromiseRequest request;
+  set<Future<PromiseResponse> > responses;
+  size_t responsesReceived;
+  Option<uint64_t> highestNackProposal;
+  Option<Action> highestAckAction;
+
+  process::Promise<PromiseResponse> promise;
+};
+
+
+class ImplicitPromiseProcess : public Process<ImplicitPromiseProcess>
+{
+public:
+  ImplicitPromiseProcess(
+      size_t _quorum,
+      const Shared<Network>& _network,
+      uint64_t _proposal)
+    : ProcessBase(ID::generate("log-implicit-promise")),
+      quorum(_quorum),
+      network(_network),
+      proposal(_proposal),
+      responsesReceived(0) {}
+
+  virtual ~ImplicitPromiseProcess() {}
+
+  Future<PromiseResponse> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    request.set_proposal(proposal);
+
+    network->broadcast(protocol::promise, request)
+      .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+  }
+
+  virtual void finalize()
+  {
+    // This process will be terminated when we get responses from a
+    // quorum of replicas. In that case, we no longer care about
+    // responses from other replicas, thus discarding them here.
+    discard(responses);
+  }
+
+private:
+  void broadcasted(const Future<set<Future<PromiseResponse> > >& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to broadcast implicit promise request: " + future.failure() :
+          "Not expecting discarded future");
+      terminate(self());
+      return;
+    }
+
+    responses = future.get();
+    foreach (const Future<PromiseResponse>& response, responses) {
+      response.onReady(defer(self(), &Self::received, lambda::_1));
+    }
+  }
+
+  void received(const PromiseResponse& response)
+  {
+    responsesReceived++;
+
+    if (!response.okay()) {
+      // Failed to get the promise from a replica because it has
+      // promised a proposer with a higher proposal number. The
+      // 'proposal' field in the response specifies the proposal
+      // number. It is found to be larger than the proposal number
+      // used in this phase.
+      if (highestNackProposal.isNone() ||
+          highestNackProposal.get() < response.proposal()) {
+        highestNackProposal = response.proposal();
+      }
+    } else if (highestNackProposal.isSome()) {
+      // We still want to wait for more potential NACK responses so we
+      // can return the highest proposal number seen but we don't care
+      // about any more ACK responses.
+    } else {
+      CHECK(response.has_position());
+      if (highestEndPosition.isNone() ||
+          highestEndPosition.get() < response.position()) {
+        highestEndPosition = response.position();
+      }
+    }
+
+    if (responsesReceived >= quorum) {
+      // A quorum of replicas have replied.
+      PromiseResponse result;
+
+      if (highestNackProposal.isSome()) {
+        result.set_okay(false);
+        result.set_proposal(highestNackProposal.get());
+      } else {
+        CHECK_SOME(highestEndPosition);
+
+        result.set_okay(true);
+        result.set_position(highestEndPosition.get());
+      }
+
+      promise.set(result);
+      terminate(self());
+    }
+  }
+
+  const size_t quorum;
+  const Shared<Network> network;
+  const uint64_t proposal;
+
+  PromiseRequest request;
+  set<Future<PromiseResponse> > responses;
+  size_t responsesReceived;
+  Option<uint64_t> highestNackProposal;
+  Option<uint64_t> highestEndPosition;
+
+  process::Promise<PromiseResponse> promise;
+};
+
+
+class WriteProcess : public Process<WriteProcess>
+{
+public:
+  WriteProcess(
+      size_t _quorum,
+      const Shared<Network>& _network,
+      uint64_t _proposal,
+      const Action& _action)
+    : ProcessBase(ID::generate("log-write")),
+      quorum(_quorum),
+      network(_network),
+      proposal(_proposal),
+      action(_action),
+      responsesReceived(0) {}
+
+  virtual ~WriteProcess() {}
+
+  Future<WriteResponse> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    request.set_proposal(proposal);
+    request.set_position(action.position());
+    request.set_type(action.type());
+    switch (action.type()) {
+      case Action::NOP:
+        CHECK(action.has_nop());
+        request.mutable_nop();
+        break;
+      case Action::APPEND:
+        CHECK(action.has_append());
+        request.mutable_append()->CopyFrom(action.append());
+        break;
+      case Action::TRUNCATE:
+        CHECK(action.has_truncate());
+        request.mutable_truncate()->CopyFrom(action.truncate());
+        break;
+      default:
+        LOG(FATAL) << "Unknown Action::Type " << action.type();
+    }
+
+    network->broadcast(protocol::write, request)
+      .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+  }
+
+  virtual void finalize()
+  {
+    // This process will be terminated when we get responses from a
+    // quorum of replicas. In that case, we no longer care about
+    // responses from other replicas, thus discarding them here.
+    discard(responses);
+  }
+
+private:
+  void broadcasted(const Future<set<Future<WriteResponse> > >& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to broadcast the write request: " + future.failure() :
+          "Not expecting discarded future");
+      terminate(self());
+      return;
+    }
+
+    responses = future.get();
+    foreach (const Future<WriteResponse>& response, responses) {
+      response.onReady(defer(self(), &Self::received, lambda::_1));
+    }
+  }
+
+  void received(const WriteResponse& response)
+  {
+    CHECK_EQ(response.position(), request.position());
+
+    responsesReceived++;
+
+    if (!response.okay()) {
+      // A replica rejects the write request because this position has
+      // been promised to a proposer with a higher proposal number.
+      // The 'proposal' field in the response specifies the proposal
+      // number. It is found to be larger than the proposal number
+      // used in this phase.
+      if (highestNackProposal.isNone() ||
+          highestNackProposal.get() < response.proposal()) {
+        highestNackProposal = response.proposal();
+      }
+    }
+
+    if (responsesReceived >= quorum) {
+      // A quorum of replicas have replied.
+      WriteResponse result;
+
+      if (highestNackProposal.isSome()) {
+        result.set_okay(false);
+        result.set_proposal(highestNackProposal.get());
+      } else {
+        result.set_okay(true);
+      }
+
+      promise.set(result);
+      terminate(self());
+    }
+  }
+
+  const size_t quorum;
+  const Shared<Network> network;
+  const uint64_t proposal;
+  const Action action;
+
+  WriteRequest request;
+  set<Future<WriteResponse> > responses;
+  size_t responsesReceived;
+  Option<uint64_t> highestNackProposal;
+
+  process::Promise<WriteResponse> promise;
+};
+
+
+class FillProcess : public Process<FillProcess>
+{
+public:
+  FillProcess(
+      size_t _quorum,
+      const Shared<Network>& _network,
+      uint64_t _proposal,
+      uint64_t _position)
+    : ProcessBase(ID::generate("log-fill")),
+      quorum(_quorum),
+      network(_network),
+      position(_position),
+      proposal(_proposal) {}
+
+  virtual ~FillProcess() {}
+
+  Future<Action> future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+        static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    runPromisePhase();
+  }
+
+  virtual void finalize()
+  {
+    promising.discard();
+    writing.discard();
+  }
+
+private:
+  void runPromisePhase()
+  {
+    promising = log::promise(quorum, network, proposal, position);
+    promising.onAny(defer(self(), &Self::checkPromisePhase));
+  }
+
+  void checkPromisePhase()
+  {
+    // The future 'promising' can only be discarded in 'finalize'
+    CHECK(!promising.isDiscarded());
+
+    if (promising.isFailed()) {
+      promise.fail("Explicit promise phase failed: " + promising.failure());
+      terminate(self());
+    } else {
+      const PromiseResponse& response = promising.get();
+      if (!response.okay()) {
+        // Retry with a higher proposal number.
+        retry(response.proposal());
+      } else if (response.has_action()) {
+        // A previously performed write has been found. Paxos
+        // restricts us to write the same value.
+        Action action = response.action();
+
+        CHECK_EQ(action.position(), position);
+        CHECK(action.has_type());
+        action.set_promised(proposal);
+        action.set_performed(proposal);
+
+        if (action.has_learned() && action.learned()) {
+          // If the promise phase returns a learned action, we simply
+          // learn the action by broadcasting a learned message. We
+          // don't check if a quorum of replicas acknowledge the
+          // learned message. Because of that, a catch-up replica
+          // needs to make sure that all positions it needs to recover
+          // have been learned before it can re-join the Paxos (i.e.,
+          // invoking log::catchup). Otherwise, we may not have a
+          // quorum of replicas remember an agreed value, leading to
+          // potential inconsistency in the log.
+          runLearnPhase(action);
+        } else {
+          runWritePhase(action);
+        }
+      } else {
+        // No previously performed write has been found. We can
+        // write any value. We choose to write a NOP.
+        Action action;
+        action.set_position(position);
+        action.set_promised(proposal);
+        action.set_performed(proposal);
+        action.set_type(Action::NOP);
+        action.mutable_nop();
+
+        runWritePhase(action);
+      }
+    }
+  }
+
+  void runWritePhase(const Action& action)
+  {
+    CHECK(!action.has_learned() || !action.learned());
+
+    writing = log::write(quorum, network, proposal, action);
+    writing.onAny(defer(self(), &Self::checkWritePhase, action));
+  }
+
+  void checkWritePhase(const Action& action)
+  {
+    // The future 'writing' can only be discarded in 'finalize'.
+    CHECK(!writing.isDiscarded());
+
+    if (writing.isFailed()) {
+      promise.fail("Write phase failed: " + writing.failure());
+      terminate(self());
+    } else {
+      const WriteResponse& response = writing.get();
+      if (!response.okay()) {
+        // Retry with a higher proposal number.
+        retry(response.proposal());
+      } else {
+        // The write has been accepted (and thus performed) by a
+        // quorum of replicas. A consensus has been reached.
+        Action learnedAction = action;
+        learnedAction.set_learned(true);
+
+        runLearnPhase(learnedAction);
+      }
+    }
+  }
+
+  void runLearnPhase(const Action& action)
+  {
+    CHECK(action.has_learned() && action.learned());
+
+    // We need to make sure that the learned message has been
+    // broadcasted before the fill process completes. Some users may
+    // rely on this invariant (e.g. checking if the local replica has
+    // learned the action).
+    log::learn(network, action)
+      .onAny(defer(self(), &Self::checkLearnPhase, action, lambda::_1));
+  }
+
+  void checkLearnPhase(const Action& action, const Future<Nothing>& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Write phase failed: " + future.failure() :
+          "Not expecting discarded future");
+      terminate(self());
+    } else {
+      promise.set(action);
+      terminate(self());
+    }
+  }
+
+  void retry(uint64_t highestNackProposal)
+  {
+    // See comments below.
+    static const Duration T = Milliseconds(100);
+
+    // Bump the proposal number.
+    CHECK(highestNackProposal >= proposal);
+    proposal = highestNackProposal + 1;
+
+    // Randomized back-off. Generate a random delay in [T, 2T). T has
+    // to be chosen carefully. We want T >> broadcast time such that
+    // one proposer usually times out and wins before others wake up.
+    // On the other hand, we want T to be as small as possible such
+    // that we can reduce the wait time.
+    Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
+    delay(d, self(), &Self::runPromisePhase);
+  }
+
+  const size_t quorum;
+  const Shared<Network> network;
+  const uint64_t position;
+
+  uint64_t proposal;
+
+  process::Promise<Action> promise;
+  Future<PromiseResponse> promising;
+  Future<WriteResponse> writing;
+};
+
+
+/////////////////////////////////////////////////
+// Public interfaces below.
+/////////////////////////////////////////////////
+
+
+Future<PromiseResponse> promise(
+    size_t quorum,
+    const Shared<Network>& network,
+    uint64_t proposal,
+    const Option<uint64_t>& position)
+{
+  if (position.isNone()) {
+    ImplicitPromiseProcess* process =
+      new ImplicitPromiseProcess(
+          quorum,
+          network,
+          proposal);
+
+    Future<PromiseResponse> future = process->future();
+    spawn(process, true);
+    return future;
+  } else {
+    ExplicitPromiseProcess* process =
+      new ExplicitPromiseProcess(
+          quorum,
+          network,
+          proposal,
+          position.get());
+
+    Future<PromiseResponse> future = process->future();
+    spawn(process, true);
+    return future;
+  }
+}
+
+
+Future<WriteResponse> write(
+    size_t quorum,
+    const Shared<Network>& network,
+    uint64_t proposal,
+    const Action& action)
+{
+  WriteProcess* process =
+    new WriteProcess(
+        quorum,
+        network,
+        proposal,
+        action);
+
+  Future<WriteResponse> future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+
+Future<Nothing> learn(const Shared<Network>& network, const Action& action)
+{
+  LearnedMessage message;
+  message.mutable_action()->CopyFrom(action);
+
+  if (!action.has_learned() || !action.learned()) {
+    message.mutable_action()->set_learned(true);
+  }
+
+  return network->broadcast(message);
+}
+
+
+Future<Action> fill(
+    size_t quorum,
+    const Shared<Network>& network,
+    uint64_t proposal,
+    uint64_t position)
+{
+  FillProcess* process =
+    new FillProcess(
+        quorum,
+        network,
+        proposal,
+        position);
+
+  Future<Action> future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/consensus.hpp
----------------------------------------------------------------------
diff --git a/src/log/consensus.hpp b/src/log/consensus.hpp
new file mode 100644
index 0000000..ba41601
--- /dev/null
+++ b/src/log/consensus.hpp
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_CONSENSUS_HPP__
+#define __LOG_CONSENSUS_HPP__
+
+#include <stdint.h>
+
+#include <process/future.hpp>
+#include <process/shared.hpp>
+
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "log/network.hpp"
+
+#include "messages/log.hpp"
+
+// We use Paxos consensus protocol to agree on the value of each entry
+// in the replicated log. In our system, each replica is both an
+// acceptor and a learner. There are several types of proposers in the
+// system. Coordinator is one type of proposers we use to append new
+// log entries. The 'log::fill' function below creates an internal
+// proposer each time it is called. These internal proposers are used
+// to agree on previously written entries in the log.
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Runs the promise phase (a.k.a., the prepare phase) in Paxos. This
+// phase has two purposes. First, the proposer asks promises from a
+// quorum of replicas not to accept writes from proposers with lower
+// proposal numbers. Second, the proposer looks for potential
+// previously agreed values. Only these values can be written in the
+// next phase. This restriction is used by Paxos to make sure that if
+// a value has been agreed on for a log position, subsequent writes to
+// this log position will always have the same value. We can run the
+// promise phase either for a specified log position ("explicit"
+// promise), or for all positions that have not yet been promised to
+// any proposer ("implicit" promise). The latter is a well known
+// optimization called Multi-Paxos. If the leader is relatively
+// stable, we can skip the promise phase for future instance of the
+// protocol with the same leader.
+//
+// We re-use PromiseResponse to specify the return value of this
+// phase. In the case of explicit promise, if a learned action has
+// been found in a response, this phase succeeds immediately with the
+// 'okay' field set to true and the 'action' field set to the learned
+// action. If no learned action has been found in a quorum of
+// replicas, we first check if some of them reply Nack (i.e., they
+// refuse to give promise). If yes, we set the 'okay' field to false
+// and set the 'proposal' field to be the highest proposal number seen
+// in these Nack responses. If none of them replies Nack, we set the
+// 'okay' field to true and set the 'action' field to be the action
+// that is performed by the proposer with the highest proposal number
+// in these responses. If no action has been found in these responses,
+// we leave the 'action' field unset.
+//
+// In the case of implicit promise, we must wait until a quorum of
+// replicas have replied. If some of them reply Nack, we set the
+// 'okay' field to false and set the 'proposal' field to be the
+// highest proposal number seen in these Nack responses. If none of
+// them replies Nack, we set the 'okay' field to true and set the
+// 'position' field to be the highest position (end position) seen in
+// these responses.
+extern process::Future<PromiseResponse> promise(
+    size_t quorum,
+    const process::Shared<Network>& network,
+    uint64_t proposal,
+    const Option<uint64_t>& position = None());
+
+
+// Runs the write phase (a.k.a., the propose phase) in Paxos. In this
+// phase, the proposer broadcasts a write to replicas. This phase
+// succeeds if a quorum of replicas accept the write. A proposer
+// cannot write if it hasn't gained enough (i.e., a quorum of)
+// promises from replicas. We re-use WriteResponse to specify the
+// return value of this phase. We must wait until a quorum of replicas
+// have replied. If some of them reply Nack, we set the 'okay' field
+// to false and set the 'proposal' field to be the highest proposal
+// number seen in these Nack responses. If none of them replies Nack,
+// we set the 'okay' field to true.
+extern process::Future<WriteResponse> write(
+    size_t quorum,
+    const process::Shared<Network>& network,
+    uint64_t proposal,
+    const Action& action);
+
+
+// Runs the learn phase (a.k.a, the commit phase) in Paxos. In fact,
+// this phase is not required, but treated as an optimization. In this
+// phase, a proposer broadcasts a learned message to replicas,
+// indicating that a consensus has already been reached for the given
+// log position. No need to wait for responses from replicas. When
+// the future is ready, the learned message has been broadcasted.
+extern process::Future<Nothing> learn(
+    const process::Shared<Network>& network,
+    const Action& action);
+
+
+// Tries to reach consensus for the given log position by running a
+// full Paxos round (i.e., promise -> write -> learn). If no value has
+// been previously agreed on for the given log position, a NOP will be
+// proposed. This function will automatically retry by bumping the
+// proposal number if the specified proposal number is found to be not
+// high enough. To ensure liveness, it will inject a random delay
+// before retrying. A learned action will be returned when the
+// operation succeeds.
+extern process::Future<Action> fill(
+    size_t quorum,
+    const process::Shared<Network>& network,
+    uint64_t proposal,
+    uint64_t position);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_CONSENSUS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/coordinator.cpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp
index 6e6466f..b2ead8e 100644
--- a/src/log/coordinator.cpp
+++ b/src/log/coordinator.cpp
@@ -18,38 +18,33 @@
 
 #include <algorithm>
 
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-
-#include <stout/check.hpp>
-#include <stout/duration.hpp>
 #include <stout/error.hpp>
-#include <stout/foreach.hpp>
 #include <stout/none.hpp>
 
+#include "log/catchup.hpp"
+#include "log/consensus.hpp"
 #include "log/coordinator.hpp"
-#include "log/replica.hpp"
+
+#include "messages/log.hpp"
 
 using namespace process;
 
-using std::list;
-using std::pair;
 using std::set;
 using std::string;
 
-
 namespace mesos {
 namespace internal {
 namespace log {
 
-Coordinator::Coordinator(int _quorum,
-                         Replica* _replica,
-                         Network* _network)
-  : elected(false),
-    quorum(_quorum),
+Coordinator::Coordinator(
+    size_t _quorum,
+    const Shared<Replica>& _replica,
+    const Shared<Network>& _network)
+  : quorum(_quorum),
     replica(_replica),
     network(_network),
-    id(0),
+    elected(false),
+    proposal(0),
     index(0) {}
 
 
@@ -67,94 +62,76 @@ Result<uint64_t> Coordinator::elect(const Timeout& timeout)
   }
 
   // Get the highest known promise from our local replica.
-  Future<uint64_t> promise = replica->promised();
+  Future<uint64_t> promised = replica->promised();
 
-  if (!promise.await(timeout.remaining())) {
+  if (!promised.await(timeout.remaining())) {
+    promised.discard();
     return None();
-  } else if (promise.isFailed()) {
-    return Error(promise.failure());
+  } else if (promised.isFailed()) {
+    return Error(promised.failure());
   }
 
-  CHECK(promise.isReady()) << "Not expecting a discarded future!";
-
-  id = std::max(id, promise.get()) + 1; // Try the next highest!
-
-  PromiseRequest request;
-  request.set_id(id);
-
-  // Broadcast the request to the network.
-  set<Future<PromiseResponse> > futures =
-    broadcast(protocol::promise, request);
-
-  uint32_t okays = 0;
-
-  do {
-    Future<Future<PromiseResponse> > future = select(futures);
-    if (future.await(timeout.remaining())) {
-      CHECK(future.get().isReady());
-      const PromiseResponse& response = future.get().get();
-      if (!response.okay()) {
-        return None(); // Lost an election, but can retry.
-      } else if (response.okay()) {
-        CHECK(response.has_position());
-        index = std::max(index, response.position());
-        okays++;
-        if (okays >= quorum) {
-          break;
-        }
-      }
-      futures.erase(future.get());
-    }
-  } while (timeout.remaining() > Seconds(0));
+  CHECK(promised.isReady()) << "Not expecting a discarded future!";
+
+  proposal = std::max(proposal, promised.get()) + 1; // Try the next highest!
+
+  // Run the implicit promise phase.
+  Future<PromiseResponse> promising = log::promise(quorum, network, proposal);
+
+  if (!promising.await(timeout.remaining())) {
+    promising.discard();
+    return None();
+  } else if (promising.isFailed()) {
+    return Error(promising.failure());
+  }
 
-  // Discard the remaining futures.
-  discard(futures);
+  CHECK(promising.isReady()) << "Not expecting a discarded future!";
 
-  // Either we have a quorum or we timed out.
-  if (okays >= quorum) {
+  const PromiseResponse& response = promising.get();
+  if (!response.okay()) {
+    // Lost an election, but can retry.
+    proposal = response.proposal();
+    return None();
+  } else {
     LOG(INFO) << "Coordinator elected, attempting to fill missing positions";
-    elected = true;
 
-    // Need to "catchup" local replica (i.e., fill in any unlearned
+    CHECK(response.has_position());
+
+    index = response.position();
+
+    // Need to "catch-up" local replica (i.e., fill in any unlearned
     // and/or missing positions) so that we can do local reads.
     // Usually we could do this lazily, however, a local learned
     // position might have been truncated, so we actually need to
-    // catchup the local replica all the way to the end of the log
+    // catch-up the local replica all the way to the end of the log
     // before we can perform any up-to-date local reads.
 
-    Future<set<uint64_t> > positions = replica->missing(index);
+    Future<set<uint64_t> > positions = replica->missing(0, index);
 
     if (!positions.await(timeout.remaining())) {
-      elected = false;
+      positions.discard();
       return None();
     } else if (positions.isFailed()) {
-      elected = false;
       return Error(positions.failure());
     }
 
     CHECK(positions.isReady()) << "Not expecting a discarded future!";
 
-    foreach (uint64_t position, positions.get()) {
-      Result<Action> result = fill(position, timeout);
-      if (result.isError()) {
-        elected = false;
-        return Error(result.error());
-      } else if (result.isNone()) {
-        elected = false;
-        return None();
-      } else {
-        CHECK_SOME(result);
-        CHECK(result.get().position() == position);
-      }
+    Future<Nothing> catching =
+      log::catchup(quorum, replica, network, proposal, positions.get());
+
+    if (!catching.await(timeout.remaining())) {
+      catching.discard();
+      return None();
+    } else if (catching.isFailed()) {
+      return Error(catching.failure());
     }
 
-    index += 1;
-    return index - 1;
-  }
+    CHECK(catching.isReady()) << "Not expecting a discarded future!";
 
-  // Timed out ...
-  LOG(INFO) << "Coordinator timed out while trying to get elected";
-  return None();
+    elected = true;
+    return index++;
+  }
 }
 
 
@@ -175,8 +152,8 @@ Result<uint64_t> Coordinator::append(
 
   Action action;
   action.set_position(index);
-  action.set_promised(id);
-  action.set_performed(id);
+  action.set_promised(proposal);
+  action.set_performed(proposal);
   action.set_type(Action::APPEND);
   Action::Append* append = action.mutable_append();
   append->set_bytes(bytes);
@@ -184,7 +161,7 @@ Result<uint64_t> Coordinator::append(
   Result<uint64_t> result = write(action, timeout);
 
   if (result.isSome()) {
-    CHECK(result.get() == index);
+    CHECK_EQ(result.get(), index);
     index++;
   }
 
@@ -202,8 +179,8 @@ Result<uint64_t> Coordinator::truncate(
 
   Action action;
   action.set_position(index);
-  action.set_promised(id);
-  action.set_performed(id);
+  action.set_promised(proposal);
+  action.set_performed(proposal);
   action.set_type(Action::TRUNCATE);
   Action::Truncate* truncate = action.mutable_truncate();
   truncate->set_to(to);
@@ -211,7 +188,7 @@ Result<uint64_t> Coordinator::truncate(
   Result<uint64_t> result = write(action, timeout);
 
   if (result.isSome()) {
-    CHECK(result.get() == index);
+    CHECK_EQ(result.get(), index);
     index++;
   }
 
@@ -233,309 +210,60 @@ Result<uint64_t> Coordinator::write(
   CHECK(action.has_performed());
   CHECK(action.has_type());
 
-  // TODO(benh): Eliminate this special case hack?
-  if (quorum == 1) {
-    Result<uint64_t> result = commit(action);
-    if (result.isError()) {
-      return Error(result.error());
-    } else if (result.isNone()) {
-      return None();
-    } else {
-      CHECK_SOME(result);
-      return action.position();
-    }
-  }
-
-  WriteRequest request;
-  request.set_id(id);
-  request.set_position(action.position());
-  request.set_type(action.type());
-  switch (action.type()) {
-    case Action::NOP:
-      CHECK(action.has_nop());
-      request.mutable_nop();
-      break;
-    case Action::APPEND:
-      CHECK(action.has_append());
-      request.mutable_append()->MergeFrom(action.append());
-      break;
-    case Action::TRUNCATE:
-      CHECK(action.has_truncate());
-      request.mutable_truncate()->MergeFrom(action.truncate());
-      break;
-    default:
-      LOG(FATAL) << "Unknown Action::Type!";
-  }
-
-  // Broadcast the request to the network *excluding* the local replica.
-  set<Future<WriteResponse> > futures =
-    remotecast(protocol::write, request);
-
-  uint32_t okays = 0;
-
-  do {
-    Future<Future<WriteResponse> > future = select(futures);
-    if (future.await(timeout.remaining())) {
-      CHECK(future.get().isReady());
-      const WriteResponse& response = future.get().get();
-      CHECK(response.id() == request.id());
-      CHECK(response.position() == request.position());
-      if (!response.okay()) {
-        elected = false;
-        return Error("Coordinator demoted");
-      } else if (response.okay()) {
-        if (++okays >= (quorum - 1)) { // N.B. Using (quorum - 1) here!
-          // Got enough remote okays, discard the remaining futures
-          // and try and commit the action locally.
-          discard(futures);
-          Result<uint64_t> result = commit(action);
-          if (result.isError()) {
-            return Error(result.error());
-          } else if (result.isNone()) {
-            return None();
-          } else {
-            CHECK_SOME(result);
-            return action.position();
-          }
-        }
-      }
-      futures.erase(future.get());
-    }
-  } while (timeout.remaining() > Seconds(0));
-
-  // Timed out ... discard remaining futures.
-  LOG(INFO) << "Coordinator timed out while attempting to write "
-            << Action::Type_Name(action.type())
-            << " action at position " << action.position();
-  discard(futures);
-  return None();
-}
-
-
-Result<uint64_t> Coordinator::commit(const Action& action)
-{
-  LOG(INFO) << "Coordinator attempting to commit "
-            << Action::Type_Name(action.type())
-            << " action at position " << action.position();
-
-  CHECK(elected);
-
-  WriteRequest request;
-  request.set_id(id);
-  request.set_position(action.position());
-  request.set_learned(true); // A commit is just a learned write.
-  request.set_type(action.type());
-  switch (action.type()) {
-    case Action::NOP:
-      CHECK(action.has_nop());
-      request.mutable_nop();
-      break;
-    case Action::APPEND:
-      CHECK(action.has_append());
-      request.mutable_append()->MergeFrom(action.append());
-      break;
-    case Action::TRUNCATE:
-      CHECK(action.has_truncate());
-      request.mutable_truncate()->MergeFrom(action.truncate());
-      break;
-    default:
-      LOG(FATAL) << "Unknown Action::Type!";
-  }
-
-  //  TODO(benh): Add a non-message based way to do this write.
-  Future<WriteResponse> future = protocol::write(replica->pid(), request);
-
-  // We send a write request to the *local* replica just as the
-  // others: asynchronously via messages. However, rather than add the
-  // complications of dealing with timeouts for local operations
-  // (especially since we are trying to commit something), we make
-  // things simpler and block on the response from the local replica.
-  // Maybe we can let it timeout, but consider it a failure? This
-  // might be sound because we don't send the learned messages ... so
-  // this should be the same as if we just failed before we even do
-  // the write ... a client should just retry this write later.
+  Future<WriteResponse> writing =
+    log::write(quorum, network, proposal, action);
 
-  future.await(); // TODO(benh): Don't wait forever, see comment above.
-
-  if (future.isFailed()) {
-    return Error(future.failure());
+  if (!writing.await(timeout.remaining())) {
+    writing.discard();
+    return None();
+  } else if (writing.isFailed()) {
+    return Error(writing.failure());
   }
 
-  CHECK(future.isReady()) << "Not expecting a discarded future!";
-
-  const WriteResponse& response = future.get();
-  CHECK(response.id() == request.id());
-  CHECK(response.position() == request.position());
+  CHECK(writing.isReady()) << "Not expecting a discarded future!";
 
+  const WriteResponse& response = writing.get();
   if (!response.okay()) {
     elected = false;
+    proposal = response.proposal();
     return Error("Coordinator demoted");
-  }
-
-  // Commit successful, send a learned message to the network
-  // *excluding* the local replica and return the position.
-
-  LearnedMessage message;
-  message.mutable_action()->MergeFrom(action);
-
-  if (!action.has_learned() || !action.learned()) {
-    message.mutable_action()->set_learned(true);
-  }
-
-  LOG(INFO) << "Telling other replicas of learned action at position "
-            << action.position();
-
-  remotecast(message);
-
-  return action.position();
-}
-
-
-Result<Action> Coordinator::fill(uint64_t position, const Timeout& timeout)
-{
-  LOG(INFO) << "Coordinator attempting to fill position "
-            << position << " in the log";
-
-  CHECK(elected);
-
-  PromiseRequest request;
-  request.set_id(id);
-  request.set_position(position);
-
-  // Broadcast the request to the network.
-  set<Future<PromiseResponse> > futures =
-    broadcast(protocol::promise, request);
-
-  list<PromiseResponse> responses;
-
-  do {
-    Future<Future<PromiseResponse> > future = select(futures);
-    if (future.await(timeout.remaining())) {
-      CHECK(future.get().isReady());
-      const PromiseResponse& response = future.get().get();
-      CHECK(response.id() == request.id());
-      if (!response.okay()) {
-        elected = false;
-        return Error("Coordinator demoted");
-      } else if (response.okay()) {
-        responses.push_back(response);
-        if (responses.size() >= quorum) {
-          break;
-        }
-      }
-      futures.erase(future.get());
-    }
-  } while (timeout.remaining() > Seconds(0));
-
-  // Discard the remaining futures.
-  discard(futures);
-
-  // Either have a quorum or we timed out.
-  if (responses.size() >= quorum) {
-    // Check the responses for a learned action, otherwise, pick the
-    // action with the higest performed id or a no-op if no responses
-    // include performed actions.
-    Action action;
-    foreach (const PromiseResponse& response, responses) {
-      if (response.has_action()) {
-        CHECK(response.action().position() == position);
-        if (response.action().has_learned() && response.action().learned()) {
-          // Received a learned action, try and commit locally. Note
-          // that there is no checking that we get the _same_ learned
-          // action in the event we get multiple responses with
-          // learned actions, we just take the "first". In fact, there
-          // is a specific instance in which learned actions will NOT
-          // be the same! In this instance, one replica may return
-          // that the action is a learned no-op because it knows the
-          // position has been truncated while another replica (that
-          // hasn't learned the truncation yet) might return the
-          // actual action at this position. Picking either action is
-          // _correct_, since eventually we know this position will be
-          // truncated. Fun!
-          Result<uint64_t> result = commit(response.action());
-          if (result.isError()) {
-            return Error(result.error());
-          } else if (result.isNone()) {
-            return None();
-          } else {
-            CHECK_SOME(result);
-            return response.action();
-          }
-        } else if (response.action().has_performed() &&
-                   (!action.has_performed() ||
-                    response.action().performed() > action.performed())) {
-          action = response.action();
-        }
-      } else {
-        CHECK(response.has_position());
-        CHECK(response.position() == position);
-      }
+  } else {
+    // TODO(jieyu): Currently, each log operation (append or truncate)
+    // will write the same log content to the local disk twice: one
+    // from log::write() and one from log::learn(). In the future, we
+    // may want to use checksum to eliminate the duplicate disk write.
+    Future<Nothing> learning = log::learn(network, action);
+
+    // We need to make sure that learned message has been broadcasted,
+    // thus has been enqueued.  Otherwise, our "missing" check below
+    // will fail sometimes due to race condition.
+    if (!learning.await(timeout.remaining())) {
+      learning.discard();
+      return None();
+    } else if (learning.isFailed()) {
+      return Error(learning.failure());
     }
 
-    // Use a no-op if no known action has been performed.
-    if (!action.has_performed()) {
-      action.set_position(position);
-      action.set_promised(id);
-      action.set_performed(id);
-      action.set_type(Action::NOP);
-      action.mutable_nop();
-    } else {
-      action.set_performed(id);
-    }
+    CHECK(learning.isReady()) << "Not expecting a discarded future!";
 
-    Result<uint64_t> result = write(action, timeout);
+    // Make sure that the local replica has learned the newly written
+    // log entry. Since messages are delivered and dispatched in order
+    // locally, we should always have the new entry learned by now.
+    Future<bool> checking = replica->missing(action.position());
 
-    if (result.isError()) {
-      return Error(result.error());
-    } else if (result.isNone()) {
+    if (!checking.await(timeout.remaining())) {
+      checking.discard();
       return None();
-    } else {
-      CHECK_SOME(result);
-      return action;
+    } else if (checking.isFailed()) {
+      return Error(checking.failure());
     }
-  }
-
-  // Timed out ...
-  LOG(INFO) << "Coordinator timed out attempting to fill position "
-            << position << " in the log";
-  return None();
-}
 
+    CHECK(checking.isReady()) << "Not expecting a discarded future!";
 
-template <typename Req, typename Res>
-set<Future<Res> > Coordinator::broadcast(
-    const Protocol<Req, Res>& protocol,
-    const Req& req)
-{
-  Future<set<Future<Res> > > futures =
-    network->broadcast(protocol, req);
-  futures.await();
-  CHECK(futures.isReady());
-  return futures.get();
-}
-
+    CHECK(!checking.get());
 
-template <typename Req, typename Res>
-set<Future<Res> > Coordinator::remotecast(
-    const Protocol<Req, Res>& protocol,
-    const Req& req)
-{
-  set<UPID> filter;
-  filter.insert(replica->pid());
-  Future<set<Future<Res> > > futures =
-    network->broadcast(protocol, req, filter);
-  futures.await();
-  CHECK(futures.isReady());
-  return futures.get();
-}
-
-
-template <typename M>
-void Coordinator::remotecast(const M& m)
-{
-  set<UPID> filter;
-  filter.insert(replica->pid());
-  network->broadcast(m, filter);
+    return action.position();
+  }
 }
 
 } // namespace log {

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/coordinator.hpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.hpp b/src/log/coordinator.hpp
index 3f6fb7c..b0ff8df 100644
--- a/src/log/coordinator.hpp
+++ b/src/log/coordinator.hpp
@@ -19,10 +19,11 @@
 #ifndef __LOG_COORDINATOR_HPP__
 #define __LOG_COORDINATOR_HPP__
 
+#include <stdint.h>
+
 #include <string>
-#include <vector>
 
-#include <process/process.hpp>
+#include <process/shared.hpp>
 #include <process/timeout.hpp>
 
 #include <stout/result.hpp>
@@ -30,9 +31,6 @@
 #include "log/network.hpp"
 #include "log/replica.hpp"
 
-#include "messages/log.hpp"
-
-
 namespace mesos {
 namespace internal {
 namespace log {
@@ -40,7 +38,10 @@ namespace log {
 class Coordinator
 {
 public:
-  Coordinator(int quorum, Replica* replica, Network* group);
+  Coordinator(
+      size_t _quorum,
+      const process::Shared<Replica>& _replica,
+      const process::Shared<Network>& _network);
 
   ~Coordinator();
 
@@ -65,45 +66,16 @@ public:
   Result<uint64_t> truncate(uint64_t to, const process::Timeout& timeout);
 
 private:
-  // Helper that tries to achieve consensus of the specified action. A
-  // result of none means the write failed (e.g., due to timeout), but
-  // can be retried.
-  Result<uint64_t> write(const Action& action, const process::Timeout& timeout);
-
-  // Helper that handles commiting an action (i.e., writing to the
-  // local replica and then sending out learned messages).
-  Result<uint64_t> commit(const Action& action);
-
-  // Helper that tries to fill a position in the log.
-  Result<Action> fill(uint64_t position, const process::Timeout& timeout);
-
-  // Helper that uses the specified protocol to broadcast a request to
-  // our group and return a set of futures.
-  template <typename Req, typename Res>
-  std::set<process::Future<Res> > broadcast(
-      const Protocol<Req, Res>& protocol,
-      const Req& req);
-
-  // Helper like broadcast, but excludes our local replica.
-  template <typename Req, typename Res>
-  std::set<process::Future<Res> > remotecast(
-      const Protocol<Req, Res>& protocol,
-      const Req& req);
-
-  // Helper like remotecast but ignores any responses.
-  template <typename M>
-  void remotecast(const M& m);
-
-  bool elected; // True if this coordinator has been elected.
-
-  const uint32_t quorum; // Quorum size.
-
-  Replica* replica; // Local log replica.
-
-  Network* network; // Used to broadcast requests and messages to replicas.
+  Result<uint64_t> write(
+      const Action& action,
+      const process::Timeout& timeout);
 
-  uint64_t id; // Coordinator ID.
+  const size_t quorum;
+  const process::Shared<Replica> replica;
+  const process::Shared<Network> network;
 
+  bool elected; // True if this coordinator has been elected.
+  uint64_t proposal; // Currently used proposal number.
   uint64_t index; // Last position written in the log.
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/log.hpp
----------------------------------------------------------------------
diff --git a/src/log/log.hpp b/src/log/log.hpp
index 77edc7a..042f13b 100644
--- a/src/log/log.hpp
+++ b/src/log/log.hpp
@@ -23,7 +23,9 @@
 #include <set>
 #include <string>
 
+#include <process/owned.hpp>
 #include <process/process.hpp>
+#include <process/shared.hpp>
 #include <process/timeout.hpp>
 
 #include <stout/check.hpp>
@@ -139,7 +141,7 @@ public:
     Position ending();
 
   private:
-    Replica* replica;
+    process::Shared<Replica> replica;
   };
 
   class Writer
@@ -179,18 +181,18 @@ public:
   Log(int _quorum,
       const std::string& path,
       const std::set<process::UPID>& pids)
-    : group(NULL)
+    : group(NULL),
+      executor(NULL),
+      quorum(_quorum),
+      replica(new Replica(path))
   {
     GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-    quorum = _quorum;
+    // Add our own replica to the network.
+    Network* _network = new Network(pids);
+    _network->add(replica->pid());
 
-    replica = new Replica(path);
-
-    network = new Network(pids);
-
-    // Don't forget to add our own replica!
-    network->add(replica->pid());
+    network.reset(_network);
   }
 
   // Creates a new replicated log that assumes the specified quorum
@@ -203,36 +205,34 @@ public:
       const Duration& timeout,
       const std::string& znode,
       const Option<zookeeper::Authentication>& auth = None())
+    : group(new zookeeper::Group(servers, timeout, znode, auth)),
+      executor(new process::Executor()),
+      quorum(_quorum),
+      replica(new Replica(path)),
+      network(new ZooKeeperNetwork(servers, timeout, znode, auth))
   {
     GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-    quorum = _quorum;
-
-    LOG(INFO) << "Creating a new log replica";
-
-    replica = new Replica(path);
-
-    group = new zookeeper::Group(servers, timeout, znode, auth);
-    network = new ZooKeeperNetwork(group);
-
     // Need to add our replica to the ZooKeeper group!
     LOG(INFO) << "Attempting to join replica to ZooKeeper group";
 
     membership = group->join(replica->pid())
-      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
 
     group->watch()
-      .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
-      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+      .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
+      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
   }
 
   ~Log()
   {
-    delete network;
+    network.own().await();
+    replica.own().await();
+
+    delete executor;
     delete group;
-    delete replica;
   }
 
   // Returns a position based off of the bytes recovered from
@@ -261,14 +261,15 @@ private:
   void failed(const std::string& message) const;
   void discarded() const;
 
+  // We store a Group instance in order to continually renew the
+  // replicas membership (when using ZooKeeper).
   zookeeper::Group* group;
   process::Future<zookeeper::Group::Membership> membership;
-  process::Executor executor;
+  process::Executor* executor;
 
   int quorum;
-
-  Replica* replica;
-  Network* network;
+  process::Shared<Replica> replica;
+  process::Shared<Network> network;
 };
 
 
@@ -420,14 +421,14 @@ void Log::watch(const std::set<zookeeper::Group::Membership>& memberships)
     // Our replica's membership must have expired, join back up.
     LOG(INFO) << "Renewing replica group membership";
     membership = group->join(replica->pid())
-      .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
   }
 
   group->watch(memberships)
-    .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
-    .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
-    .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+    .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
+    .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+    .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index d34cf78..2b674f6 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -33,6 +33,7 @@
 #include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 
 #include "logging/logging.hpp"
 
@@ -67,13 +68,14 @@ public:
   process::Future<std::set<process::Future<Res> > > broadcast(
       const Protocol<Req, Res>& protocol,
       const Req& req,
-      const std::set<process::UPID>& filter = std::set<process::UPID>());
+      const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
 
-  // Sends a message to each member of the network.
+  // Sends a message to each member of the network. The returned
+  // future is set when the message is broadcasted.
   template <typename M>
-  void broadcast(
+  process::Future<Nothing> broadcast(
       const M& m,
-      const std::set<process::UPID>& filter = std::set<process::UPID>());
+      const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
 
 private:
   // Not copyable, not assignable.
@@ -87,11 +89,19 @@ private:
 class ZooKeeperNetwork : public Network
 {
 public:
-  ZooKeeperNetwork(zookeeper::Group* group);
+  ZooKeeperNetwork(
+      const std::string& servers,
+      const Duration& timeout,
+      const std::string& znode,
+      const Option<zookeeper::Authentication>& auth);
 
 private:
   typedef ZooKeeperNetwork This;
 
+  // Not copyable, not assignable.
+  ZooKeeperNetwork(const ZooKeeperNetwork&);
+  ZooKeeperNetwork& operator = (const ZooKeeperNetwork&);
+
   // Helper that sets up a watch on the group.
   void watch(const std::set<zookeeper::Group::Membership>& expected);
 
@@ -101,9 +111,13 @@ private:
   // Invoked when group members data has been collected.
   void collected(const process::Future<std::list<std::string> >& datas);
 
-  zookeeper::Group* group;
-  process::Executor executor;
+  zookeeper::Group group;
   process::Future<std::set<zookeeper::Group::Membership> > memberships;
+
+  // NOTE: The declaration order here is important. We want to delete
+  // the 'executor' before we delete the 'group' so that we don't get
+  // spurious fatal errors when the 'group' is being deleted.
+  process::Executor executor;
 };
 
 
@@ -157,7 +171,7 @@ public:
   }
 
   template <typename M>
-  void broadcast(
+  Nothing broadcast(
       const M& m,
       const std::set<process::UPID>& filter)
   {
@@ -168,6 +182,7 @@ public:
         process::post(pid, m);
       }
     }
+    return Nothing();
   }
 
 private:
@@ -223,7 +238,7 @@ template <typename Req, typename Res>
 process::Future<std::set<process::Future<Res> > > Network::broadcast(
     const Protocol<Req, Res>& protocol,
     const Req& req,
-    const std::set<process::UPID>& filter)
+    const std::set<process::UPID>& filter) const
 {
   return process::dispatch(process, &NetworkProcess::broadcast<Req, Res>,
                            protocol, req, filter);
@@ -231,20 +246,24 @@ process::Future<std::set<process::Future<Res> > > Network::broadcast(
 
 
 template <typename M>
-void Network::broadcast(
+process::Future<Nothing> Network::broadcast(
     const M& m,
-    const std::set<process::UPID>& filter)
+    const std::set<process::UPID>& filter) const
 {
   // Need to disambiguate overloaded function.
-  void (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&) =
-    &NetworkProcess::broadcast<M>;
+  Nothing (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&)
+    = &NetworkProcess::broadcast<M>;
 
-  process::dispatch(process, broadcast, m, filter);
+  return process::dispatch(process, broadcast, m, filter);
 }
 
 
-inline ZooKeeperNetwork::ZooKeeperNetwork(zookeeper::Group* _group)
-  : group(_group)
+inline ZooKeeperNetwork::ZooKeeperNetwork(
+    const std::string& servers,
+    const Duration& timeout,
+    const std::string& znode,
+    const Option<zookeeper::Authentication>& auth)
+  : group(servers, timeout, znode, auth)
 {
   watch(std::set<zookeeper::Group::Membership>());
 }
@@ -253,7 +272,7 @@ inline ZooKeeperNetwork::ZooKeeperNetwork(zookeeper::Group* _group)
 inline void ZooKeeperNetwork::watch(
     const std::set<zookeeper::Group::Membership>& expected)
 {
-  memberships = group->watch(expected);
+  memberships = group.watch(expected);
   memberships
     .onAny(executor.defer(lambda::bind(&This::watched, this, lambda::_1)));
 }
@@ -264,7 +283,7 @@ inline void ZooKeeperNetwork::watched(
 {
   if (memberships.isFailed()) {
     // We can't do much here, we could try creating another Group but
-    // that might just continue indifinitely, so we fail early
+    // that might just continue indefinitely, so we fail early
     // instead. Note that Group handles all retryable/recoverable
     // ZooKeeper errors internally.
     LOG(FATAL) << "Failed to watch ZooKeeper group: " << memberships.failure();
@@ -278,7 +297,7 @@ inline void ZooKeeperNetwork::watched(
   std::list<process::Future<std::string> > futures;
 
   foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
-    futures.push_back(group->data(membership));
+    futures.push_back(group.data(membership));
   }
 
   process::collect(futures, process::Timeout::in(Seconds(5)))