You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/08 04:44:01 UTC

[1/4] mesos git commit: Added operators for offer operation update protobuf classes.

Repository: mesos
Updated Branches:
  refs/heads/master 02758c4e7 -> 884459168


Added operators for offer operation update protobuf classes.

Review: https://reviews.apache.org/r/64093/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/06209e61
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/06209e61
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/06209e61

Branch: refs/heads/master
Commit: 06209e614c6f4a0593d017ab293bd1819acbc02b
Parents: 02758c4
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Thu Dec 7 19:59:46 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Dec 7 20:41:54 2017 -0800

----------------------------------------------------------------------
 include/mesos/type_utils.hpp |  9 +++++
 src/common/type_utils.cpp    | 49 +++++++++++++++++++++++
 src/messages/messages.cpp    | 83 ++++++++++++++++++++++++++++++++++++++-
 src/messages/messages.hpp    | 15 +++++++
 4 files changed, 154 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/06209e61/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index b786d0e..506f667 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -64,6 +64,10 @@ bool operator==(const Labels& left, const Labels& right);
 bool operator==(const MasterInfo& left, const MasterInfo& right);
 
 bool operator==(
+    const OfferOperationStatus& left,
+    const OfferOperationStatus& right);
+
+bool operator==(
     const ResourceProviderInfo& left,
     const ResourceProviderInfo& right);
 
@@ -82,6 +86,11 @@ bool operator==(const Volume& left, const Volume& right);
 bool operator!=(const CheckStatusInfo& left, const CheckStatusInfo& right);
 bool operator!=(const ExecutorInfo& left, const ExecutorInfo& right);
 bool operator!=(const Labels& left, const Labels& right);
+
+bool operator!=(
+    const OfferOperationStatus& left,
+    const OfferOperationStatus& right);
+
 bool operator!=(const TaskStatus& left, const TaskStatus& right);
 
 inline bool operator==(const ExecutorID& left, const ExecutorID& right)

http://git-wip-us.apache.org/repos/asf/mesos/blob/06209e61/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 8cc987c..65586a5 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -434,6 +434,55 @@ bool operator==(
 
 
 bool operator==(
+    const OfferOperationStatus& left,
+    const OfferOperationStatus& right)
+{
+  if (left.has_operation_id() != right.has_operation_id()) {
+    return false;
+  }
+
+  if (left.has_operation_id() && left.operation_id() != right.operation_id()) {
+    return false;
+  }
+
+  if (left.state() != right.state()) {
+    return false;
+  }
+
+  if (left.has_message() != right.has_message()) {
+    return false;
+  }
+
+  if (left.has_message() && left.message() != right.message()) {
+    return false;
+  }
+
+  if (Resources(left.converted_resources()) !=
+      Resources(right.converted_resources())) {
+    return false;
+  }
+
+  if (left.has_status_uuid() != right.has_status_uuid()) {
+    return false;
+  }
+
+  if (left.has_status_uuid() && left.status_uuid() != right.status_uuid()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+bool operator!=(
+    const OfferOperationStatus& left,
+    const OfferOperationStatus& right)
+{
+  return !(left == right);
+}
+
+
+bool operator==(
     const ResourceStatistics& left,
     const ResourceStatistics& right)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/06209e61/src/messages/messages.cpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.cpp b/src/messages/messages.cpp
index 6029502..47f7dee 100644
--- a/src/messages/messages.cpp
+++ b/src/messages/messages.cpp
@@ -55,13 +55,62 @@ bool operator!=(
 }
 
 
+bool operator==(
+    const OfferOperationStatusUpdate& left,
+    const OfferOperationStatusUpdate& right)
+{
+  if (left.has_framework_id() != right.has_framework_id()) {
+    return false;
+  }
+
+  if (left.has_framework_id() && left.framework_id() != right.framework_id()) {
+    return false;
+  }
+
+  if (left.has_slave_id() != right.has_slave_id()) {
+    return false;
+  }
+
+  if (left.has_slave_id() && left.slave_id() != right.slave_id()) {
+    return false;
+  }
+
+  if (left.status() != right.status()) {
+    return false;
+  }
+
+  if (left.has_latest_status() != right.has_latest_status()) {
+    return false;
+  }
+
+  if (left.has_latest_status() &&
+      left.latest_status() != right.latest_status()) {
+    return false;
+  }
+
+  if (left.operation_uuid() != right.operation_uuid()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+bool operator!=(
+    const OfferOperationStatusUpdate& left,
+    const OfferOperationStatusUpdate& right)
+{
+  return !(left == right);
+}
+
+
 ostream& operator<<(ostream& stream, const StatusUpdate& update)
 {
   stream << update.status().state();
 
   if (update.has_uuid()) {
-    stream << " (UUID: " << stringify(UUID::fromBytes(update.uuid()).get())
-           << ")";
+    stream << " (Status UUID: "
+           << stringify(UUID::fromBytes(update.uuid()).get()) << ")";
   }
 
   stream << " for task " << update.status().task_id();
@@ -75,6 +124,36 @@ ostream& operator<<(ostream& stream, const StatusUpdate& update)
 }
 
 
+ostream& operator<<(ostream& stream, const OfferOperationStatusUpdate& update)
+{
+  stream << update.status().state();
+
+  if (update.status().has_status_uuid()) {
+    stream << " (Status UUID: "
+           << stringify(UUID::fromBytes(update.status().status_uuid()).get())
+           << ")";
+  }
+
+  stream << " for operation UUID "
+         << stringify(UUID::fromBytes(update.operation_uuid()).get());
+
+  if (update.status().has_operation_id()) {
+    stream << " (framework-supplied ID '" << update.status().operation_id()
+           << "')";
+  }
+
+  if (update.has_framework_id()) {
+    stream << " of framework '" << update.framework_id() << "'";
+  }
+
+  if (update.has_slave_id()) {
+    stream << " on agent " << update.slave_id();
+  }
+
+  return stream;
+}
+
+
 ostream& operator<<(ostream& stream, const StatusUpdateRecord::Type& type)
 {
   return stream

http://git-wip-us.apache.org/repos/asf/mesos/blob/06209e61/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 2756eba..60b1065 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -45,11 +45,26 @@ bool operator!=(
     const ResourceVersionUUID& right);
 
 
+bool operator==(
+    const OfferOperationStatusUpdate& left,
+    const OfferOperationStatusUpdate& right);
+
+
+bool operator!=(
+    const OfferOperationStatusUpdate& left,
+    const OfferOperationStatusUpdate& right);
+
+
 std::ostream& operator<<(std::ostream& stream, const StatusUpdate& update);
 
 
 std::ostream& operator<<(
     std::ostream& stream,
+    const OfferOperationStatusUpdate& update);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
     const StatusUpdateRecord::Type& type);
 
 } // namespace internal {


[4/4] mesos git commit: Implemented the `OfferOperationStatusUpdateManager`.

Posted by gr...@apache.org.
Implemented the `OfferOperationStatusUpdateManager`.

This class will handle the offer operation status updates generated by
the agent and by resource providers.

Review: https://reviews.apache.org/r/64096/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/88445916
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/88445916
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/88445916

Branch: refs/heads/master
Commit: 8844591685c7395552bc740954bdf83ff0fc67d0
Parents: 7ce169c
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Thu Dec 7 19:59:51 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Dec 7 20:42:24 2017 -0800

----------------------------------------------------------------------
 src/CMakeLists.txt                              |   4 +
 src/Makefile.am                                 |   4 +
 src/status_update_manager/offer_operation.cpp   | 130 +++
 src/status_update_manager/offer_operation.hpp   | 121 +++
 src/tests/CMakeLists.txt                        |   1 +
 ...er_operation_status_update_manager_tests.cpp | 796 +++++++++++++++++++
 6 files changed, 1056 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 35a602d..65f1949 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -364,6 +364,9 @@ set(SECRET_SRC
 set(STATE_SRC
   state/in_memory.cpp)
 
+set(STATUS_UPDATE_MANAGER_SRC
+  status_update_manager/offer_operation.cpp)
+
 if (NOT WIN32)
   list(APPEND STATE_SRC
     state/leveldb.cpp
@@ -431,6 +434,7 @@ set(MESOS_SRC
   ${SCHEDULER_SRC}
   ${SECRET_SRC}
   ${STATE_SRC}
+  ${STATUS_UPDATE_MANAGER_SRC}
   ${URI_SRC}
   ${USAGE_SRC}
   ${V1_SRC}

http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index f948960..1620b2a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1072,6 +1072,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/containerizer/mesos/provisioner/docker/registry_puller.cpp	\
   slave/containerizer/mesos/provisioner/docker/store.cpp		\
   slave/resource_estimators/noop.cpp					\
+  status_update_manager/offer_operation.cpp				\
   uri/fetcher.cpp							\
   uri/utils.cpp								\
   uri/fetchers/copy.cpp							\
@@ -1229,6 +1230,8 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/containerizer/mesos/provisioner/docker/store.hpp		\
   slave/qos_controllers/noop.hpp					\
   slave/resource_estimators/noop.hpp					\
+  status_update_manager/status_update_manager_process.hpp		\
+  status_update_manager/offer_operation.hpp				\
   tests/active_user_test_helper.hpp					\
   tests/allocator.hpp							\
   tests/cluster.hpp							\
@@ -2473,6 +2476,7 @@ mesos_tests_SOURCES =						\
   tests/mock_registrar.cpp					\
   tests/module.cpp						\
   tests/module_tests.cpp					\
+  tests/offer_operation_status_update_manager_tests.cpp		\
   tests/oversubscription_tests.cpp				\
   tests/partition_tests.cpp					\
   tests/paths_tests.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
new file mode 100644
index 0000000..f66690e
--- /dev/null
+++ b/src/status_update_manager/offer_operation.cpp
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "status_update_manager/offer_operation.hpp"
+
+#include <list>
+
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/uuid.hpp>
+
+using lambda::function;
+
+using process::Future;
+using process::Owned;
+using process::wait; // Necessary on some OS's to disambiguate.
+
+namespace mesos {
+namespace internal {
+
+OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
+  : process(new StatusUpdateManagerProcess<
+        UUID,
+        OfferOperationStatusUpdateRecord,
+        OfferOperationStatusUpdate>())
+{
+  spawn(process.get());
+}
+
+
+OfferOperationStatusUpdateManager::~OfferOperationStatusUpdateManager()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+void OfferOperationStatusUpdateManager::initialize(
+    const function<void(const OfferOperationStatusUpdate&)>& forward,
+    const function<const std::string(const UUID&)>& getPath)
+{
+  dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::initialize,
+      forward,
+      getPath);
+}
+
+
+Future<Nothing> OfferOperationStatusUpdateManager::update(
+    const OfferOperationStatusUpdate& update,
+    bool checkpoint)
+{
+  Try<UUID> operationUuid = UUID::fromBytes(update.operation_uuid());
+  CHECK_SOME(operationUuid);
+
+  return dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::update,
+      update,
+      operationUuid.get(),
+      checkpoint);
+}
+
+
+Future<bool> OfferOperationStatusUpdateManager::acknowledgement(
+      const UUID& operationUuid,
+      const UUID& statusUuid)
+{
+  return dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::acknowledgement,
+      operationUuid,
+      statusUuid);
+}
+
+
+process::Future<OfferOperationStatusManagerState>
+OfferOperationStatusUpdateManager::recover(
+    const std::list<UUID>& operationUuids,
+    bool strict)
+{
+  return dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::recover,
+      operationUuids,
+      strict);
+}
+
+
+void OfferOperationStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
+{
+  dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::cleanup,
+      frameworkId);
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/status_update_manager/offer_operation.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.hpp b/src/status_update_manager/offer_operation.hpp
new file mode 100644
index 0000000..8751f0b
--- /dev/null
+++ b/src/status_update_manager/offer_operation.hpp
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __STATUS_UPDATE_MANAGER_OFFER_OPERATION_HPP__
+#define __STATUS_UPDATE_MANAGER_OFFER_OPERATION_HPP__
+
+#include <list>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/messages.hpp"
+
+#include "status_update_manager/status_update_manager_process.hpp"
+
+namespace mesos {
+namespace internal {
+
+typedef StatusUpdateManagerProcess<
+    UUID,
+    OfferOperationStatusUpdateRecord,
+    OfferOperationStatusUpdate>::State OfferOperationStatusManagerState;
+
+class OfferOperationStatusUpdateManager
+{
+public:
+  OfferOperationStatusUpdateManager();
+  ~OfferOperationStatusUpdateManager();
+
+  OfferOperationStatusUpdateManager(
+      const OfferOperationStatusUpdateManager& that) = delete;
+  OfferOperationStatusUpdateManager& operator=(
+      const OfferOperationStatusUpdateManager& that) = delete;
+
+  // Expects two callbacks:
+  //   `forward`: called in order to forward an offer operation status update
+  //              to its recipient.
+  //   `getPath`: called in order to generate the path of a status update stream
+  //              file, given the operation's `operation_uuid`.
+  void initialize(
+      const lambda::function<void(const OfferOperationStatusUpdate&)>& forward,
+      const lambda::function<const std::string(const UUID&)>& getPath);
+
+  // Checkpoints the update if necessary and reliably sends the update.
+  //
+  // Returns whether the update is handled successfully (e.g. checkpointed).
+  process::Future<Nothing> update(
+      const OfferOperationStatusUpdate& update,
+      bool checkpoint = true);
+
+  // Checkpoints the acknowledgement to disk if necessary.
+  // Also, sends the next pending status update, if any.
+  //
+  // Returns:
+  //   - `true`: if the ACK is handled successfully (e.g., checkpointed)
+  //             and the status update stream is not terminated.
+  //   - `false`: same as above except the status update stream is terminated.
+  //   - A `Failure`: if there are any errors (e.g., duplicate, checkpointing).
+  process::Future<bool> acknowledgement(
+      const UUID& operationUuid,
+      const UUID& statusUuid);
+
+  // Recover status updates. The provided list of operation_uuids is used as the
+  // source of truth for which checkpointed files should be recovered from.
+  //
+  // Returns the recovered state, including a map from operation ID to the
+  // stream state recovered for the status file.
+  //
+  // The stream state will be `None` if:
+  //
+  //   * The status updates file didn't exist.
+  //   * The status updates file was empty.
+  //
+  // The stream state contains all the status updates (both acknowledged and
+  // pending) added to the stream.
+  //
+  // This struct also contains a count of the recoverable errors found during
+  // non-strict recovery.
+  process::Future<OfferOperationStatusManagerState> recover(
+      const std::list<UUID>& operationUuids,
+      bool strict);
+
+  // Closes all the status update streams corresponding to this framework.
+  //
+  // NOTE: This stops retrying any pending status updates for this framework,
+  // but does NOT garbage collect any checkpointed state. The caller is
+  // responsible for garbage collection after this method has returned.
+  void cleanup(const FrameworkID& frameworkId);
+
+private:
+  process::Owned<
+      StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>> process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATUS_UPDATE_MANAGER_OFFER_OPERATION_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 92db731..1c95755 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -101,6 +101,7 @@ set(MESOS_TESTS_SRC
   http_fault_tolerance_tests.cpp
   master_maintenance_tests.cpp
   master_slave_reconciliation_tests.cpp
+  offer_operation_status_update_manager_tests.cpp
   partition_tests.cpp
   paths_tests.cpp
   protobuf_io_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/88445916/src/tests/offer_operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp b/src/tests/offer_operation_status_update_manager_tests.cpp
new file mode 100644
index 0000000..a31a525
--- /dev/null
+++ b/src/tests/offer_operation_status_update_manager_tests.cpp
@@ -0,0 +1,796 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/uuid.hpp>
+
+#include "slave/constants.hpp"
+
+#include "tests/mesos.hpp"
+#include "tests/utils.hpp"
+
+#include "status_update_manager/offer_operation.hpp"
+
+using lambda::function;
+
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Promise;
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// This class will be the target of the forward callbacks passed to the offer
+// operation status update managers in this test suite.
+//
+// It uses gmock, so that we can easily set expectations and check how
+// often/which status updates are forwarded.
+class MockOfferOperationStatusUpdateProcessor
+{
+public:
+  MOCK_METHOD1(update, void(const OfferOperationStatusUpdate&));
+};
+
+
+class OfferOperationStatusUpdateManagerTest : public MesosTest
+{
+protected:
+  OfferOperationStatusUpdateManagerTest()
+    : statusUpdateManager(new OfferOperationStatusUpdateManager())
+  {
+    Clock::pause();
+
+    const function<void(const OfferOperationStatusUpdate&)> forward =
+      [&](const OfferOperationStatusUpdate& update) {
+        statusUpdateProcessor.update(update);
+      };
+
+    statusUpdateManager->initialize(
+        forward, OfferOperationStatusUpdateManagerTest::getPath);
+  }
+
+  ~OfferOperationStatusUpdateManagerTest()
+  {
+    Clock::resume();
+  }
+
+  OfferOperationStatusUpdate createOfferOperationStatusUpdate(
+      const UUID& statusUuid,
+      const UUID& operationUuid,
+      const OfferOperationState& state,
+      const Option<FrameworkID>& frameworkId = None())
+  {
+    OfferOperationStatusUpdate statusUpdate;
+
+    statusUpdate.set_operation_uuid(operationUuid.toBytes());
+
+    if (frameworkId.isSome()) {
+      statusUpdate.mutable_framework_id()->CopyFrom(frameworkId.get());
+    }
+
+    OfferOperationStatus* status = statusUpdate.mutable_status();
+    status->set_state(state);
+    status->set_status_uuid(statusUuid.toBytes());
+
+    return statusUpdate;
+  }
+
+  void resetStatusUpdateManager()
+  {
+    statusUpdateManager.reset(new OfferOperationStatusUpdateManager());
+
+    const function<void(const OfferOperationStatusUpdate&)> forward =
+      [&](const OfferOperationStatusUpdate& update) {
+        statusUpdateProcessor.update(update);
+      };
+
+    statusUpdateManager->initialize(
+        forward, OfferOperationStatusUpdateManagerTest::getPath);
+  }
+
+  static const string getPath(const UUID& operationUuid)
+  {
+    return path::join(os::getcwd(), "streams", operationUuid.toString());
+  }
+
+  Owned<OfferOperationStatusUpdateManager> statusUpdateManager;
+  MockOfferOperationStatusUpdateProcessor statusUpdateProcessor;
+};
+
+
+// This test verifies that the status update manager will not retry a terminal
+// status update after it has been acknowledged.
+TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a terminal update, so `acknowledgement`
+  // should return `false`.
+  AWAIT_EXPECT_FALSE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Advance the clock, the status update has been acknowledged, so it shouldn't
+  // trigger a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that the status update manager will not retry a
+// non-terminal status update after it has been acknowledged.
+TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+    statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+  // should return `true`.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Advance the clock, the status update has been acknowledged, so it shouldn't
+  // trigger a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that the status update manager resends status updates
+// until they are acknowledged.
+TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid, operationUuid, OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+
+  EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+  // Advance the clock to trigger a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Verify that the status update is forwarded again.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+
+  // Acknowledge the update, this is a terminal update, so `acknowledgement`
+  // should return `false`.
+  AWAIT_EXPECT_FALSE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Advance the clock, the status update has been acknowledged, so it shouldn't
+  // trigger a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that after the updates belonging to a framework are
+// cleaned up from the status update manager, the status update manager stops
+// resending them.
+TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  FrameworkID frameworkId;
+  frameworkId.set_value("frameworkId");
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED,
+      frameworkId);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Cleanup the framework.
+  statusUpdateManager->cleanup(frameworkId);
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that the status update manager is able to recover
+// checkpointed status updates, and that it resends the recovered updates that
+// haven't been acknowledged.
+TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+
+  resetStatusUpdateManager();
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+  // Recover the checkpointed stream.
+  Future<OfferOperationStatusManagerState> state =
+    statusUpdateManager->recover({operationUuid}, true);
+
+  AWAIT_READY(state);
+
+  EXPECT_EQ(0u, state->errors);
+  EXPECT_TRUE(state->streams.contains(operationUuid));
+  EXPECT_SOME(state->streams.at(operationUuid));
+
+  const Option<OfferOperationStatusUpdate> recoveredUpdate =
+    state->streams.at(operationUuid)->updates.front();
+
+  ASSERT_SOME(recoveredUpdate);
+  EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+  // The stream should NOT be terminated.
+  EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
+
+  // Check that the status update is resent.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+}
+
+
+// This test verifies that the status update manager returns a `Failure` when
+// trying to recover a stream that isn't checkpointed.
+TEST_F(OfferOperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a non-checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Verify that the stream file is NOT created.
+  EXPECT_TRUE(!os::exists(getPath(operationUuid)));
+
+  resetStatusUpdateManager();
+
+  // Trying to recover the non-checkpointed stream should fail.
+  AWAIT_EXPECT_FAILED(statusUpdateManager->recover({operationUuid}, true));
+}
+
+
+// This test verifies that the status update manager  doesn't return a
+// `Failure` when trying to recover a stream from an empty file.
+//
+// This can happen when the checkpointing failed between opening the file and
+// writing the first update. In this case `recover()` should succeed, but return
+// `None` as the operation's state.
+TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyFile)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  resetStatusUpdateManager();
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Truncate the status updates to zero bytes, simulating a failure in
+  // checkpointing, where the file was created, but the status update manager
+  // crashed before any data was written to disk.
+  Try<int_fd> fd = os::open(getPath(operationUuid), O_RDWR);
+  ASSERT_SOME(os::ftruncate(fd.get(), 0));
+  os::close(fd.get());
+
+  // The recovery of the empty stream should not fail, but the stream state
+  // should be empty.
+  Future<OfferOperationStatusManagerState> state =
+    statusUpdateManager->recover({operationUuid}, false);
+
+  AWAIT_READY(state);
+
+  EXPECT_EQ(0u, state->errors);
+  EXPECT_TRUE(state->streams.contains(operationUuid));
+  EXPECT_NONE(state->streams.at(operationUuid));
+}
+
+
+// This test verifies that the status update manager doesn't return a `Failure`
+// when trying to recover a stream from a parent directory that doesn't contain
+// the status updates file.
+//
+// This can happen when the checkpointing failed after creating the parent
+// directory, but before the file was opened. In this case `recover()` should
+// succeed, but return `None` as the operation's state.
+TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyDirectory)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  resetStatusUpdateManager();
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Leave the parent directory, but remove the status updates file, simulating
+  // a failure in checkpointing, where the parent directory was created, but the
+  // checkpointing process crashed before opening the status updates file.
+  ASSERT_SOME(os::rm(getPath(operationUuid)));
+
+  // The recovery of the empty stream should not fail, but the stream state
+  // should be empty.
+  Future<OfferOperationStatusManagerState> state =
+    statusUpdateManager->recover({operationUuid}, false);
+
+  AWAIT_READY(state);
+
+  EXPECT_EQ(0u, state->errors);
+  EXPECT_TRUE(state->streams.contains(operationUuid));
+  EXPECT_NONE(state->streams.at(operationUuid));
+}
+
+
+// This test verifies that the status update manager is able to recover a
+// terminated stream, and that the stream's state reflects that it is
+// terminated.
+TEST_F(OfferOperationStatusUpdateManagerTest, RecoverTerminatedStream)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a terminal update, so `acknowledgement`
+  // should return `false`.
+  AWAIT_EXPECT_FALSE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  resetStatusUpdateManager();
+
+  // Recover the checkpointed stream.
+  Future<OfferOperationStatusManagerState> state =
+    statusUpdateManager->recover({operationUuid}, true);
+
+  AWAIT_ASSERT_READY(state);
+
+  EXPECT_EQ(0u, state->errors);
+  EXPECT_TRUE(state->streams.contains(operationUuid));
+  EXPECT_SOME(state->streams.at(operationUuid));
+
+  // The stream should be terminated.
+  EXPECT_TRUE(state->streams.at(operationUuid)->terminated);
+
+  const Option<OfferOperationStatusUpdate> recoveredUpdate =
+    state->streams.at(operationUuid)->updates.front();
+
+  ASSERT_SOME(recoveredUpdate);
+  EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+  // Advance the clock, the status update has been acknowledged, so it shouldn't
+  // trigger a retry.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that the status update manager silently ignores duplicate
+// updates.
+TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+  // should return `true`.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Check that a duplicated update is ignored.
+  AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // acknowledged.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that after recovery the status update manager still
+// silently ignores duplicate updates.
+TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+  // should return `true`.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  resetStatusUpdateManager();
+
+  // Recover the checkpointed stream.
+  AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
+
+  // Check that a duplicated update is ignored.
+  AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // acknowledged.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that the status update manager rejects duplicated
+// acknowledgements.
+TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAck)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+  // should return `true`.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // ignored.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Try to acknowledge the same update, the operation should fail.
+  AWAIT_EXPECT_FAILED(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+}
+
+
+// This test verifies that after recovery the status update manager still
+// rejects duplicated acknowledgements.
+TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
+  // should return `true`.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  resetStatusUpdateManager();
+
+  // Recover the checkpointed stream.
+  AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
+
+  // Try to acknowledge the same update, the operation should fail.
+  AWAIT_EXPECT_FAILED(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid));
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // ignored.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+}
+
+
+// This test verifies that non-strict recovery of a status updates file
+// containing a corrupted record at the end succeeds.
+TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+
+  resetStatusUpdateManager();
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // The file should contain only `OfferOperationStatusUpdateRecord` messages.
+  // Write a `OfferOperationStatusUpdate` to the end of the file, so that the
+  // recovery process encounters an error.
+  Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
+  ASSERT_SOME(fd);
+  ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
+  ASSERT_SOME(os::close(fd.get()));
+
+  EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+  // The non-strict recovery of the corrupted stream should not fail, but the
+  // state should reflect that the status update file contained a corrupted
+  // record.
+  Future<OfferOperationStatusManagerState> state =
+    statusUpdateManager->recover({operationUuid}, false);
+
+  AWAIT_READY(state);
+
+  EXPECT_EQ(1u, state->errors);
+  EXPECT_TRUE(state->streams.contains(operationUuid));
+  EXPECT_SOME(state->streams.at(operationUuid));
+
+  // Check that the status update could be recovered.
+  const Option<OfferOperationStatusUpdate> recoveredUpdate =
+    state->streams.at(operationUuid)->updates.front();
+
+  ASSERT_SOME(recoveredUpdate);
+  EXPECT_EQ(statusUpdate, recoveredUpdate.get());
+
+  // Check that the status update is resent.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+}
+
+
+// This test verifies that strict recovery of a status updates file containing
+// a corrupted record at the end fails.
+TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate));
+
+  const UUID operationUuid = UUID::random();
+  const UUID statusUuid = UUID::random();
+
+  OfferOperationStatusUpdate statusUpdate = createOfferOperationStatusUpdate(
+      statusUuid,
+      operationUuid,
+      OfferOperationState::OFFER_OPERATION_FINISHED);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+
+  resetStatusUpdateManager();
+
+  // Advance the clock enough to trigger a retry if the update hasn't been
+  // cleaned up.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // The file should contain only `OfferOperationStatusUpdateRecord` messages.
+  // Write a `OfferOperationStatusUpdate` to the end of the file, so that the
+  // recovery process encounters an error.
+  Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
+  ASSERT_SOME(fd);
+
+  ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
+  ASSERT_SOME(os::close(fd.get()));
+
+  // The strict recovery of the corrupted stream should fail.
+  AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {


[3/4] mesos git commit: Added a generic actor to be used by status update managers.

Posted by gr...@apache.org.
Added a generic actor to be used by status update managers.

This actor handles the checkpointing, recovery, and retry of status
updates.

It will initially be used by the offer operation status update
manager, but it was designed and implemented so that it can replace
the current implementation of the task status update manager.

Review: https://reviews.apache.org/r/64095/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ce169cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ce169cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ce169cf

Branch: refs/heads/master
Commit: 7ce169cf0895cdf512be4cc756a0ae67a402cebe
Parents: 680ccee
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Thu Dec 7 19:59:49 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Dec 7 20:42:14 2017 -0800

----------------------------------------------------------------------
 src/slave/constants.hpp                         |   2 +
 .../status_update_manager_process.hpp           | 983 +++++++++++++++++++
 2 files changed, 985 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ce169cf/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index 2d07bce..e6cb7cc 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -50,6 +50,8 @@ constexpr Duration DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5);
 
 constexpr Duration RECOVERY_TIMEOUT = Minutes(15);
 
+// TODO(gkleiman): Move this to a different file once `TaskStatusUpdateManager`
+// uses `StatusUpdateManagerProcess`. See MESOS-8296.
 constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MIN = Seconds(10);
 constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MAX = Minutes(10);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ce169cf/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
new file mode 100644
index 0000000..1ac6441
--- /dev/null
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -0,0 +1,983 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __STATUS_UPDATE_MANAGER_PROCESS_HPP__
+#define __STATUS_UPDATE_MANAGER_PROCESS_HPP__
+
+#include <list>
+#include <queue>
+#include <string>
+#include <utility>
+
+#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/protobuf.hpp>
+#include <process/timeout.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
+#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+#include <stout/try.hpp>
+#include <stout/utils.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "slave/constants.hpp"
+
+namespace mesos {
+namespace internal {
+
+// `StatusUpdateManagerProcess` is responsible for
+//
+// 1) Reliably sending status updates.
+// 2) Checkpointing updates to disk (optional).
+// 3) Receiving ACKs.
+// 4) Recovering checkpointed status updates after failover.
+//
+// It takes the following template parameters:
+//  - `IDType` the type of the objects used to identify the managed streams.
+//  - `CheckpointType` the type of the protobuf message written to checkpoint
+//    the streams.
+//  - `UpdateType` the type of the status updates that will be managed.
+//
+// NOTE: Unless first paused, this actor will forward updates as soon as
+// possible; for example, during recovery or as soon as the first status update
+// is processed.
+//
+// This process does NOT garbage collect any checkpointed state. The users of it
+// are responsible for the garbage collection of the status updates files.
+//
+// TODO(gkleiman): make `TaskStatusUpdateManager` use this actor (MESOS-8296).
+template <typename IDType, typename CheckpointType, typename UpdateType>
+class StatusUpdateManagerProcess
+  : public ProtobufProcess<
+        StatusUpdateManagerProcess<IDType, CheckpointType, UpdateType>>
+{
+public:
+  // This struct contains a map from stream ID to the stream state
+  // recovered for the status updates file.
+  //
+  // The stream state will be `None` if:
+  //
+  //   * The status updates file didn't exist.
+  //   * The status updates file was empty.
+  //
+  // The stream state contains all the status updates (both acknowledged and
+  // pending) added to the stream.
+  //
+  // This struct also contains a count of the recoverable errors found during
+  // non-strict recovery.
+  struct State
+  {
+    struct StreamState
+    {
+      std::list<UpdateType> updates;
+      bool terminated;
+
+      StreamState() : updates(), terminated(false) {}
+    };
+
+    // The value will be `None` if the stream could not be recovered.
+    hashmap<IDType, Option<StreamState>> streams;
+    unsigned int errors;
+
+    State() : streams(), errors(0) {}
+  };
+
+  StatusUpdateManagerProcess()
+    : process::ProcessBase(process::ID::generate("status-update-manager")),
+      paused(false) {}
+
+  StatusUpdateManagerProcess(const StatusUpdateManagerProcess& that) = delete;
+  StatusUpdateManagerProcess& operator=(
+      const StatusUpdateManagerProcess& that) = delete;
+
+  // Implementation.
+
+  // Explicitly use `initialize` since we're overloading below.
+  using process::ProcessBase::initialize;
+
+  // Initializes the actor with the necessary callbacks.
+  //
+  // `_forwardCallback` is called whenever there is a new status update that
+  // needs to be forwarded.
+  // `_getPath` is called in order to generate the path of a status update
+  // stream checkpoint file, given an `IDType`.
+  void initialize(
+      const lambda::function<void(const UpdateType&)>& _forwardCallback,
+      const lambda::function<const std::string(const IDType&)>& _getPath)
+  {
+    forwardCallback = _forwardCallback;
+    getPath = _getPath;
+  }
+
+  // Forwards the status update on the specified update stream.
+  //
+  // If `checkpoint` is `false`, the update will be retried as long as it is in
+  // memory, but it will not be checkpointed.
+  process::Future<Nothing> update(
+      const UpdateType& update,
+      const IDType& streamId,
+      bool checkpoint)
+  {
+    LOG(INFO) << "Received status update " << update;
+
+    if (!streams.contains(streamId)) {
+      Try<Nothing> create =
+        createStatusUpdateStream(
+            streamId,
+            update.has_framework_id()
+              ? Option<FrameworkID>(update.framework_id())
+              : None(),
+            checkpoint);
+
+      if (create.isError()) {
+        return process::Failure(create.error());
+      }
+    }
+    CHECK(streams.contains(streamId));
+    StatusUpdateStream* stream = streams[streamId].get();
+
+    // Verify that we didn't get a non-checkpointable update for a
+    // stream that is checkpointable, and vice-versa.
+    if (stream->checkpointed() != checkpoint) {
+      return process::Failure(
+          "Mismatched checkpoint value for status update " +
+          stringify(update) + " (expected checkpoint=" +
+          stringify(stream->checkpointed()) + " actual checkpoint=" +
+          stringify(checkpoint) + ")");
+    }
+
+    // Verify that the framework ID of the update matches the framework ID
+    // of the stream.
+    if (update.has_framework_id() != stream->frameworkId.isSome()) {
+      return process::Failure(
+          "Mismatched framework ID for status update " + stringify(update) +
+          " (expected " +
+          (stream->frameworkId.isSome()
+             ? stringify(stream->frameworkId.get())
+             : "no framework ID") +
+          " got " +
+          (update.has_framework_id()
+             ? stringify(update.framework_id())
+             : "no framework ID") +
+          ")");
+    }
+
+    if (update.has_framework_id() &&
+        update.framework_id() != stream->frameworkId.get()) {
+      return process::Failure(
+          "Mismatched framework ID for status update " + stringify(update) +
+          " (expected " + stringify(stream->frameworkId.get()) +
+          " actual " + stringify(update.framework_id()) + ")");
+    }
+
+    // Handle the status update.
+    Try<bool> result = stream->update(update);
+    if (result.isError()) {
+      return process::Failure(result.error());
+    }
+
+    // This only happens if the status update is a duplicate.
+    if (!result.get()) {
+      return Nothing();
+    }
+
+    // Forward the status update if this is at the front of the queue.
+    // Subsequent status updates will be sent in `acknowledgement()`.
+    if (!paused && stream->pending.size() == 1) {
+      CHECK_NONE(stream->timeout);
+
+      const Result<UpdateType>& next = stream->next();
+      if (next.isError()) {
+        return process::Failure(next.error());
+      }
+
+      CHECK_SOME(next);
+      stream->timeout =
+        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+    }
+
+    return Nothing();
+  }
+
+  // Process the acknowledgment of a status update.
+  //
+  // This will result in the next status update being forwarded.
+  //
+  // Returns `true` if the ACK is handled successfully (e.g., checkpointed)
+  //                and the task's status update stream is not terminated.
+  //         `false` same as above except the status update stream is
+  //                terminated.
+  //         `Failure` if there are any errors (e.g., duplicate, checkpointing).
+  process::Future<bool> acknowledgement(
+      const IDType& streamId,
+      const UUID& uuid)
+  {
+    LOG(INFO) << "Received status update acknowledgement (UUID: " << uuid << ")"
+              << " for stream " << stringify(streamId);
+
+    // This might happen if we haven't completed recovery yet or if the
+    // acknowledgement is for a stream that has been cleaned up.
+    if (!streams.contains(streamId)) {
+      return process::Failure(
+          "Cannot find the status update stream " + stringify(streamId));
+    }
+
+    StatusUpdateStream* stream = streams[streamId].get();
+
+    // Handle the acknowledgement.
+    Try<bool> result = stream->acknowledgement(uuid);
+
+    if (result.isError()) {
+      return process::Failure(result.error());
+    }
+
+    if (!result.get()) {
+      return process::Failure("Duplicate status update acknowledgement");
+    }
+
+    stream->timeout = None();
+
+    // Get the next update in the queue.
+    const Result<UpdateType>& next = stream->next();
+    if (next.isError()) {
+      return process::Failure(next.error());
+    }
+
+    bool terminated = stream->terminated;
+    if (terminated) {
+      if (next.isSome()) {
+        LOG(WARNING) << "Acknowledged a terminal status update but updates are"
+                     << " still pending";
+      }
+      cleanupStatusUpdateStream(streamId);
+    } else if (!paused && next.isSome()) {
+      // Forward the next queued status update.
+      stream->timeout =
+        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+    }
+
+    return !terminated;
+  }
+
+  // Recovers the status update manager's state using the supplied stream IDs.
+  //
+  // Returns:
+  //  - The recovered state if successful.
+  //  - The recovered state, including the number of errors encountered, if
+  //    `strict == false` and any of the streams couldn't be recovered.
+  //  - A `Failure` if `strict == true` and any of the streams couldn't be
+  //    recovered.
+  process::Future<State> recover(
+      const std::list<IDType>& streamIds,
+      bool strict)
+  {
+    LOG(INFO) << "Recovering status update manager";
+
+    State state;
+    foreach (const IDType& streamId, streamIds) {
+      Result<typename StatusUpdateStream::State> result =
+        recoverStatusUpdateStream(streamId, strict);
+
+      if (result.isError()) {
+        const std::string message =
+          "Failed to recover status update stream " +
+          stringify(streamId) + ": " + result.error();
+        LOG(WARNING) << message;
+
+        if (strict) {
+          foreachkey (const IDType& streamId, utils::copy(streams)) {
+            cleanupStatusUpdateStream(streamId);
+          }
+
+          CHECK(streams.empty());
+          CHECK(frameworkStreams.empty());
+
+          return process::Failure(message);
+        }
+
+        state.errors++;
+      } else if (result.isNone()) {
+        // This can happen if the initial checkpoint of the stream didn't
+        // complete.
+        state.streams[streamId] = None();
+      } else {
+        const typename StatusUpdateStream::State& streamState = result.get();
+
+        state.streams[streamId] = typename State::StreamState();
+        state.streams[streamId]->updates = streamState.updates;
+        state.streams[streamId]->terminated = streamState.terminated;
+
+        if (streamState.error) {
+          state.errors++;
+        }
+      }
+    }
+
+    return state;
+  }
+
+  // Closes all status update streams corresponding to a framework.
+  //
+  // NOTE: This stops retrying any pending status updates for this framework,
+  // but does NOT garbage collect any checkpointed state. The caller is
+  // responsible for garbage collection after this method has returned.
+  void cleanup(const FrameworkID& frameworkId)
+  {
+    LOG(INFO) << "Closing status update streams for framework"
+              << " '" << frameworkId << "'";
+
+    if (frameworkStreams.contains(frameworkId)) {
+      foreach (const IDType& streamId,
+               utils::copy(frameworkStreams[frameworkId])) {
+        cleanupStatusUpdateStream(streamId);
+      }
+    }
+  }
+
+  void pause()
+  {
+    LOG(INFO) << "Pausing sending status updates";
+    paused = true;
+  }
+
+  void resume()
+  {
+    LOG(INFO) << "Resuming sending status updates";
+    paused = false;
+
+    foreachpair (const IDType& streamId,
+                 process::Owned<StatusUpdateStream>& stream,
+                 streams) {
+      const Result<UpdateType>& next = stream->next();
+
+      if (next.isSome()) {
+        const UpdateType& update = next.get();
+
+        LOG(WARNING) << "Sending status update " << update;
+
+        stream->timeout =
+          forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+      }
+    }
+  }
+
+private:
+  // Forward declarations.
+  class StatusUpdateStream;
+
+  // Helper methods.
+
+  // Creates a new status update stream, adding it to `streams`.
+  Try<Nothing> createStatusUpdateStream(
+      const IDType& streamId,
+      const Option<FrameworkID>& frameworkId,
+      bool checkpoint)
+  {
+    VLOG(1) << "Creating status update stream " << stringify(streamId)
+            << " checkpoint=" << stringify(checkpoint);
+
+    Try<process::Owned<StatusUpdateStream>> stream =
+      StatusUpdateStream::create(
+          streamId,
+          frameworkId,
+          checkpoint ? Option<std::string>(getPath(streamId)) : None());
+
+    if (stream.isError()) {
+      return Error(stream.error());
+    }
+
+    streams[streamId] = std::move(stream.get());
+
+    if (frameworkId.isSome()) {
+      frameworkStreams[frameworkId.get()].insert(streamId);
+    }
+
+    return Nothing();
+  }
+
+
+  // Recovers a status update stream and adds it to the map of streams.
+  Result<typename StatusUpdateStream::State> recoverStatusUpdateStream(
+      const IDType& streamId,
+      bool strict)
+  {
+    VLOG(1) << "Recovering status update stream " << stringify(streamId);
+
+    Result<std::pair<
+        process::Owned<StatusUpdateStream>,
+        typename StatusUpdateStream::State>> result =
+          StatusUpdateStream::recover(streamId, getPath(streamId), strict);
+
+    if (result.isError()) {
+      return Error(result.error());
+    }
+
+    if (result.isNone()) {
+      return None();
+    }
+
+    process::Owned<StatusUpdateStream> stream = std::get<0>(result.get());
+    typename StatusUpdateStream::State& streamState = std::get<1>(result.get());
+
+    if (stream->terminated) {
+      return streamState;
+    }
+
+    if (stream->frameworkId.isSome()) {
+      frameworkStreams[stream->frameworkId.get()].insert(streamId);
+    }
+
+    // Get the next update in the queue.
+    const Result<UpdateType>& next = stream->next();
+    if (next.isError()) {
+      return Error(next.error());
+    }
+
+    if (!paused && next.isSome()) {
+      // Forward the next queued status update.
+      stream->timeout =
+        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+    }
+
+    streams[streamId] = std::move(stream);
+
+    return streamState;
+  }
+
+  void cleanupStatusUpdateStream(const IDType& streamId)
+  {
+    VLOG(1) << "Cleaning up status update stream " << stringify(streamId);
+
+    CHECK(streams.contains(streamId)) << "Cannot find the status update stream "
+                                      << stringify(streamId);
+
+    StatusUpdateStream* stream = streams[streamId].get();
+
+    if (stream->frameworkId.isSome()) {
+      const FrameworkID frameworkId = stream->frameworkId.get();
+
+      CHECK(frameworkStreams.contains(frameworkId));
+
+      frameworkStreams[frameworkId].erase(streamId);
+      if (frameworkStreams[frameworkId].empty()) {
+        frameworkStreams.erase(frameworkId);
+      }
+    }
+
+    streams.erase(streamId);
+  }
+
+  // Forwards the status update and starts a timer based on the `duration` to
+  // check for ACK.
+  process::Timeout forward(
+      const IDType& streamId,
+      const UpdateType& update,
+      const Duration& duration)
+  {
+    CHECK(!paused);
+
+    VLOG(1) << "Forwarding status update " << update;
+
+    forwardCallback(update);
+
+    // Send a message to self to resend after some delay if no ACK is received.
+    return delay(
+        duration,
+        ProtobufProcess<
+            StatusUpdateManagerProcess<
+            IDType,
+            CheckpointType,
+            UpdateType>>::self(),
+        &StatusUpdateManagerProcess::timeout,
+        streamId,
+        duration)
+      .timeout();
+  }
+
+  // Status update timeout.
+  void timeout(const IDType& streamId, const Duration& duration)
+  {
+    if (paused || !streams.contains(streamId)) {
+      return;
+    }
+
+    StatusUpdateStream* stream = streams[streamId].get();
+
+    // Check and see if we should resend the status update.
+    if (!stream->pending.empty()) {
+      CHECK_SOME(stream->timeout);
+
+      if (stream->timeout->expired()) {
+        const UpdateType& update = stream->pending.front();
+        LOG(WARNING) << "Resending status update " << update;
+
+        // Bounded exponential backoff.
+        Duration duration_ =
+          std::min(duration * 2, slave::STATUS_UPDATE_RETRY_INTERVAL_MAX);
+
+        stream->timeout = forward(streamId, update, duration_);
+      }
+    }
+  }
+
+  lambda::function<void(UpdateType)> forwardCallback;
+  lambda::function<const std::string(const IDType&)> getPath;
+
+  hashmap<IDType, process::Owned<StatusUpdateStream>> streams;
+  hashmap<FrameworkID, hashset<IDType>> frameworkStreams;
+  bool paused;
+
+  // Handles the status updates and acknowledgements, checkpointing them if
+  // necessary. It also holds the information about received, acknowledged and
+  // pending status updates.
+  class StatusUpdateStream
+  {
+  public:
+    struct State
+    {
+      std::list<UpdateType> updates;
+
+      bool error;
+      bool terminated; // Set to `true` if a terminal status update was ACK'ed.
+
+      State() : updates(), error(false), terminated(false) {}
+    };
+
+    ~StatusUpdateStream()
+    {
+      if (fd.isSome()) {
+        Try<Nothing> close = os::close(fd.get());
+
+        if (close.isError()) {
+          CHECK_SOME(path);
+          LOG(WARNING) << "Failed to close file '" << path.get()
+                       << "': " << close.error();
+        }
+      }
+    }
+
+    static Try<process::Owned<StatusUpdateStream>> create(
+        const IDType& streamId,
+        const Option<FrameworkID>& frameworkId,
+        const Option<std::string>& path)
+    {
+      Option<int_fd> fd;
+
+      if (path.isSome()) {
+        if (os::exists(path.get())) {
+          return Error(
+              "The status updates file '" + path.get() + "' already exists.");
+        }
+
+        // Create the base updates directory, if it doesn't exist.
+        const std::string& dirName = Path(path.get()).dirname();
+        Try<Nothing> directory = os::mkdir(dirName);
+        if (directory.isError()) {
+          return Error(
+              "Failed to create '" + dirName + "': " + directory.error());
+        }
+
+        // Open the updates file.
+        Try<int_fd> result = os::open(
+            path.get(),
+            O_CREAT | O_SYNC | O_WRONLY | O_CLOEXEC,
+            S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+        if (result.isError()) {
+          return Error(
+              "Failed to open '" + path.get() +
+              "' for status updates: " + result.error());
+        }
+
+        fd = result.get();
+      }
+
+      process::Owned<StatusUpdateStream> stream(
+          new StatusUpdateStream(streamId, path, fd));
+
+      stream->frameworkId = frameworkId;
+
+      return std::move(stream);
+    }
+
+
+    static Result<std::pair<process::Owned<StatusUpdateStream>, State>>
+    recover(const IDType& streamId, const std::string& path, bool strict)
+    {
+      if (os::exists(Path(path).dirname()) && !os::exists(path)) {
+        // This could happen if the process died before it checkpointed any
+        // status updates.
+        return None();
+      }
+
+      // Open the status updates file for reading and writing.
+      Try<int_fd> fd = os::open(path, O_SYNC | O_RDWR | O_CLOEXEC);
+
+      if (fd.isError()) {
+        return Error(
+            "Failed to open status updates stream file '" + path +
+            "': " + fd.error());
+      }
+
+      process::Owned<StatusUpdateStream> stream(
+          new StatusUpdateStream(streamId, path, fd.get()));
+
+      VLOG(1) << "Replaying updates for stream " << stringify(streamId);
+
+      // Read the updates/acknowledgments, building both the stream's in-memory
+      // structures and the state object which will be returned.
+
+      State state;
+      Result<CheckpointType> record = None();
+      while (true) {
+        // Ignore errors due to partial protobuf read and enable undoing failed
+        // reads by reverting to the previous seek position.
+        record = ::protobuf::read<CheckpointType>(fd.get(), true, true);
+
+        if (!record.isSome()) {
+          break;
+        }
+
+        switch (record->type()) {
+          case CheckpointType::ACK: {
+            // Get the corresponding update for this ACK.
+            const Result<UpdateType>& update = stream->next();
+            if (update.isError()) {
+              return Error(update.error());
+            }
+
+            if (update.isNone()) {
+              return Error(
+                  "Unexpected status update acknowledgment"
+                  " (UUID: " + UUID::fromBytes(record->uuid())->toString() +
+                  ") for stream " + stringify(streamId));
+            }
+            stream->_handle(update.get(), record->type());
+            break;
+          }
+          case CheckpointType::UPDATE: {
+            stream->_handle(record->update(), record->type());
+            state.updates.push_back(record->update());
+            break;
+          }
+        }
+      }
+
+      // Always truncate the file to contain only valid updates.
+      // NOTE: This is safe even though we ignore partial protobuf read
+      // errors above, because the `fd` is properly set to the end of the
+      // last valid update by `protobuf::read()`.
+      Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR);
+      if (currentPosition.isError()) {
+        return Error(
+            "Failed to lseek status updates stream file '" + path +
+            "': " + currentPosition.error());
+      }
+
+      Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get());
+
+      if (truncated.isError()) {
+        return Error(
+            "Failed to truncate status updates file '" + path +
+            "': " + truncated.error());
+      }
+
+      // After reading a non-corrupted updates file, `record` should be `none`.
+      if (record.isError()) {
+        std::string message =
+          "Failed to read status updates file  '" + path +
+          "': " + record.error();
+
+        if (strict) {
+          return Error(message);
+        }
+
+        LOG(WARNING) << message;
+        state.error = true;
+      }
+
+      state.terminated = stream->terminated;
+
+      if (state.updates.empty()) {
+        // A stream is created only once there's something to write to it, so
+        // this can only happen if the checkpointing of the first update was
+        // interrupted.
+        Try<Nothing> removed = os::rm(path);
+
+        if (removed.isError()) {
+          return Error(
+              "Failed to remove status updates file '" + path +
+              "': " + removed.error());
+        }
+
+        return None();
+      }
+
+      return std::make_pair(stream, state);
+    }
+
+    // This function handles the update, checkpointing if necessary.
+    //
+    // Returns `true`:  if the update is successfully handled.
+    //         `false`: if the update is a duplicate or has already been
+    //                  acknowledged.
+    //         `Error`: any errors (e.g., checkpointing).
+    Try<bool> update(const UpdateType& update)
+    {
+      if (error.isSome()) {
+        return Error(error.get());
+      }
+
+      // TODO(gkleiman): This won't work with `StatusUpdate`, because the field
+      // containing the status update uuid has a different name. In order to
+      // make the `TaskStatusUpdateManager` use this process, we should avoid
+      // depending on identical field names.
+      if (!update.status().has_status_uuid()) {
+        return Error("Status update is missing 'status_uuid'");
+      }
+      Try<UUID> statusUuid = UUID::fromBytes(update.status().status_uuid());
+      CHECK_SOME(statusUuid);
+
+      // Check that this status update has not already been acknowledged.
+      if (acknowledged.contains(statusUuid.get())) {
+        LOG(WARNING) << "Ignoring status update " << update
+                     << " that has already been acknowledged";
+        return false;
+      }
+
+      // Check that this update has not already been received.
+      if (received.contains(statusUuid.get())) {
+        LOG(WARNING) << "Ignoring duplicate status update " << update;
+        return false;
+      }
+
+      // Handle the update, checkpointing if necessary.
+      Try<Nothing> result = handle(update, CheckpointType::UPDATE);
+      if (result.isError()) {
+        return Error(result.error());
+      }
+
+      return true;
+    }
+
+    // This function handles the ACK, checkpointing if necessary.
+    //
+    // Returns `true`: if the acknowledgement is successfully handled.
+    //         `false`: if the acknowledgement is a duplicate.
+    //         `Error`: Any errors (e.g., checkpointing).
+    Try<bool> acknowledgement(const UUID& statusUuid)
+    {
+      if (error.isSome()) {
+        return Error(error.get());
+      }
+
+      // Get the corresponding update for this ACK.
+      const Result<UpdateType>& _update = next();
+      if (_update.isError()) {
+        return Error(_update.error());
+      }
+
+      // This might happen if we retried a status update and got back
+      // acknowledgments for both the original and the retried update.
+      if (_update.isNone()) {
+        return Error(
+            "Unexpected status update acknowledgment (UUID: " +
+            statusUuid.toString() + ") for stream " + stringify(streamId));
+      }
+
+      const UpdateType& update = _update.get();
+
+      if (acknowledged.contains(statusUuid)) {
+        LOG(WARNING) << "Duplicate status update acknowledgment"
+                     << " for update " << update;
+        return false;
+      }
+
+      // TODO(gkleiman): This won't work with `StatusUpdate`, because the field
+      // containing the status update uuid has a different name. In order to
+      // make the `TaskStatusUpdateManager` use this process, we should avoid
+      // depending on identical field names.
+      Try<UUID> updateStatusUuid =
+        UUID::fromBytes(update.status().status_uuid());
+      CHECK_SOME(updateStatusUuid);
+
+      // This might happen if we retried a status update and got back
+      // acknowledgments for both the original and the retried update.
+      if (statusUuid != updateStatusUuid.get()) {
+        LOG(WARNING) << "Unexpected status update acknowledgement"
+                     << " (received " << statusUuid << ", expecting "
+                     << updateStatusUuid.get() << ") for update " << update;
+        return false;
+      }
+
+      // Handle the ACK, checkpointing if necessary.
+      Try<Nothing> result = handle(update, CheckpointType::ACK);
+      if (result.isError()) {
+        return Error(result.error());
+      }
+
+      return true;
+    }
+
+    // Returns the next update (or none, if empty) in the queue.
+    Result<UpdateType> next()
+    {
+      if (error.isSome()) {
+        return Error(error.get());
+      }
+
+      if (!pending.empty()) {
+        return pending.front();
+      }
+
+      return None();
+    }
+
+    // Returns `true` if the stream is checkpointed, `false` otherwise.
+    bool checkpointed() { return path.isSome(); }
+
+    bool terminated;
+    Option<FrameworkID> frameworkId;
+    Option<process::Timeout> timeout; // Timeout for resending status update.
+    std::queue<UpdateType> pending;
+
+  private:
+    StatusUpdateStream(
+        const IDType& _streamId,
+        const Option<std::string>& _path,
+        Option<int_fd> _fd)
+      : terminated(false), streamId(_streamId), path(_path), fd(_fd) {}
+
+    // Handles the status update and writes it to disk, if necessary.
+    //
+    // TODO(vinod): The write has to be asynchronous to avoid status updates
+    // that are being checkpointed, blocking the processing of other updates.
+    // One solution is to wrap the protobuf::write inside async, but it's
+    // probably too much of an overhead to spin up a new libprocess per status
+    // update?
+    // A better solution might be to be have async write capability for file IO.
+    Try<Nothing> handle(
+        const UpdateType& update,
+        const typename CheckpointType::Type& type)
+    {
+      CHECK_NONE(error);
+
+      // Checkpoint the update if necessary.
+      if (checkpointed()) {
+        LOG(INFO) << "Checkpointing " << type << " for status update "
+                  << update;
+
+        CHECK_SOME(fd);
+
+        CheckpointType record;
+        record.set_type(type);
+
+        switch (type) {
+          case CheckpointType::UPDATE:
+            record.mutable_update()->CopyFrom(update);
+            break;
+          case CheckpointType::ACK:
+            // TODO(gkleiman): This won't work with `StatusUpdate`, because the
+            // field containing the status update uuid has a different name.
+            // In order to make the `TaskStatusUpdateManager` use this process,
+            // we should avoid depending on identical field names.
+            record.set_uuid(update.status().status_uuid());
+            break;
+        }
+
+        Try<Nothing> write = ::protobuf::write(fd.get(), record);
+        if (write.isError()) {
+          error =
+            "Failed to write acknowledgement for status update " +
+            stringify(update) + " to '" + path.get() + "': " + write.error();
+          return Error(error.get());
+        }
+      }
+
+      // Now actually handle the update.
+      _handle(update, type);
+
+      return Nothing();
+    }
+
+
+    // Handles the status update without checkpointing.
+    void _handle(
+        const UpdateType& update,
+        const typename CheckpointType::Type& type)
+    {
+      CHECK_NONE(error);
+
+      // TODO(gkleiman): This won't work with `StatusUpdate`, because the field
+      // containing the status update uuid has a different name.  In order to
+      // make the `TaskStatusUpdateManager` use this process, we should avoid
+      // depending on identical field names.
+      Try<UUID> statusUuid = UUID::fromBytes(update.status().status_uuid());
+      CHECK_SOME(statusUuid);
+
+      switch (type) {
+        case CheckpointType::UPDATE:
+          if (update.has_framework_id()) {
+            frameworkId = update.framework_id();
+          }
+
+          received.insert(statusUuid.get());
+
+          // Add it to the pending updates queue.
+          pending.push(update);
+          break;
+        case CheckpointType::ACK:
+          acknowledged.insert(statusUuid.get());
+
+          // Remove the corresponding update from the pending queue.
+          pending.pop();
+
+          if (!terminated) {
+            terminated = protobuf::isTerminalState(update.status().state());
+          }
+          break;
+      }
+    }
+
+    const IDType streamId;
+
+    const Option<std::string> path; // File path of the update stream.
+    const Option<int_fd> fd; // File descriptor to the update stream.
+
+    hashset<UUID> received;
+    hashset<UUID> acknowledged;
+
+    Option<std::string> error; // Potential non-retryable error.
+  };
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATUS_UPDATE_MANAGER_PROCESS_HPP__


[2/4] mesos git commit: Added the `OfferOperationStatusUpdateRecord` protobuf message.

Posted by gr...@apache.org.
Added the `OfferOperationStatusUpdateRecord` protobuf message.

This protobuf message is used to checkpoint offer operation status
updates and acknowledgments.

Review: https://reviews.apache.org/r/64094/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/680ccee6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/680ccee6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/680ccee6

Branch: refs/heads/master
Commit: 680ccee60c29747a1e929e4b7bb8dbed52162222
Parents: 06209e6
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Thu Dec 7 19:59:48 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Dec 7 20:42:13 2017 -0800

----------------------------------------------------------------------
 src/messages/messages.cpp   | 10 ++++++++++
 src/messages/messages.hpp   |  5 +++++
 src/messages/messages.proto | 21 +++++++++++++++++++++
 3 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/680ccee6/src/messages/messages.cpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.cpp b/src/messages/messages.cpp
index 47f7dee..56876f2 100644
--- a/src/messages/messages.cpp
+++ b/src/messages/messages.cpp
@@ -160,5 +160,15 @@ ostream& operator<<(ostream& stream, const StatusUpdateRecord::Type& type)
     << StatusUpdateRecord::Type_descriptor()->FindValueByNumber(type)->name();
 }
 
+
+ostream& operator<<(
+    ostream& stream,
+    const OfferOperationStatusUpdateRecord::Type& type)
+{
+  return stream << OfferOperationStatusUpdateRecord::Type_descriptor()
+                     ->FindValueByNumber(type)
+                     ->name();
+}
+
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/680ccee6/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 60b1065..8e42893 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -67,6 +67,11 @@ std::ostream& operator<<(
     std::ostream& stream,
     const StatusUpdateRecord::Type& type);
 
+
+std::ostream& operator<<(
+    std::ostream& stream,
+    const OfferOperationStatusUpdateRecord::Type& type);
+
 } // namespace internal {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/680ccee6/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 7ab07d7..1a70967 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -753,6 +753,27 @@ message OfferOperationStatusUpdate {
 
 
 /**
+ * Encapsulates how we checkpoint an `OfferOperationStatusUpdate` to disk.
+ *
+ * See the `OfferOperationStatusUpdateManager`.
+ */
+message OfferOperationStatusUpdateRecord {
+  enum Type {
+    UPDATE = 0;
+    ACK = 1;
+  }
+
+  required Type type = 1;
+
+  // Required if type == UPDATE.
+  optional OfferOperationStatusUpdate update = 2;
+
+  // Required if type == ACK.
+  optional bytes uuid = 3;
+}
+
+
+/**
  * This message is sent from the master to the resource provider
  * manager (either on the agent for local resource providers, or on
  * the master for external resource providers) to apply an offer