You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/13 00:59:18 UTC

[1/7] mesos git commit: Added pause/resume methods to `OfferOperationStatusUpdateManager`.

Repository: mesos
Updated Branches:
  refs/heads/master c320ab3b2 -> d0d5f5c46


Added pause/resume methods to `OfferOperationStatusUpdateManager`.

Review: https://reviews.apache.org/r/64517/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/07630ccf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/07630ccf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/07630ccf

Branch: refs/heads/master
Commit: 07630ccfaefccb500d13f8940d9d3f397bdef32b
Parents: c320ab3
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:12 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:31 2017 -0800

----------------------------------------------------------------------
 src/status_update_manager/offer_operation.cpp | 22 ++++++++++++++++++++++
 src/status_update_manager/offer_operation.hpp | 10 ++++++++++
 2 files changed, 32 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/07630ccf/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index f66690e..984969e 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -126,5 +126,27 @@ void OfferOperationStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
       frameworkId);
 }
 
+
+void OfferOperationStatusUpdateManager::pause()
+{
+  dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::pause);
+}
+
+
+void OfferOperationStatusUpdateManager::resume()
+{
+  dispatch(
+      process.get(),
+      &StatusUpdateManagerProcess<
+          UUID,
+          OfferOperationStatusUpdateRecord,
+          OfferOperationStatusUpdate>::resume);
+}
+
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/07630ccf/src/status_update_manager/offer_operation.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.hpp b/src/status_update_manager/offer_operation.hpp
index 8751f0b..e7c2b0c 100644
--- a/src/status_update_manager/offer_operation.hpp
+++ b/src/status_update_manager/offer_operation.hpp
@@ -44,7 +44,11 @@ typedef StatusUpdateManagerProcess<
 class OfferOperationStatusUpdateManager
 {
 public:
+  // NOTE: Unless first paused, the status update manager will forward updates
+  // as soon as possible; for example, during recovery or as soon as the first
+  // status update is processed.
   OfferOperationStatusUpdateManager();
+
   ~OfferOperationStatusUpdateManager();
 
   OfferOperationStatusUpdateManager(
@@ -107,6 +111,12 @@ public:
   // responsible for garbage collection after this method has returned.
   void cleanup(const FrameworkID& frameworkId);
 
+  // Stop forwarding status updates until `resume()` is called.
+  void pause();
+
+  // Resume forwarding status updates until `pause()` is called.
+  void resume();
+
 private:
   process::Owned<
       StatusUpdateManagerProcess<


[7/7] mesos git commit: Made the `StatusUpdateManagerProcess` process ID configurable.

Posted by gr...@apache.org.
Made the `StatusUpdateManagerProcess` process ID configurable.

Review: https://reviews.apache.org/r/64509/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d7da9b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d7da9b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d7da9b7

Branch: refs/heads/master
Commit: 9d7da9b77ad33530e0a436614479cafd08c13ec3
Parents: e70097c
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:19 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800

----------------------------------------------------------------------
 src/status_update_manager/offer_operation.cpp               | 4 +++-
 src/status_update_manager/status_update_manager_process.hpp | 6 ++++--
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d7da9b7/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index 8ffce61..986310e 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -38,7 +38,9 @@ OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
         new StatusUpdateManagerProcess<
             UUID,
             OfferOperationStatusUpdateRecord,
-            OfferOperationStatusUpdate>("offer operation status update"))
+            OfferOperationStatusUpdate>(
+                "offer-operation-status-update-manager",
+                "offer operation status update"))
 {
   spawn(process.get());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d7da9b7/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 935f754..8c9e06f 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -108,8 +108,10 @@ public:
     State() : streams(), errors(0) {}
   };
 
-  StatusUpdateManagerProcess(const std::string& _statusUpdateType)
-    : process::ProcessBase(process::ID::generate("status-update-manager")),
+  StatusUpdateManagerProcess(
+      const std::string& id,
+      const std::string& _statusUpdateType)
+    : process::ProcessBase(process::ID::generate(id)),
       statusUpdateType(_statusUpdateType),
       paused(false) {}
 


[2/7] mesos git commit: Improved the logging in `StatusUpdateManagerProcess`.

Posted by gr...@apache.org.
Improved the logging in `StatusUpdateManagerProcess`.

This patch adds to the log and error messages the type of status update
handled by the instance of `StatusUpdateManagerProcess`.

Review: https://reviews.apache.org/r/64472/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3fa8d64e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3fa8d64e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3fa8d64e

Branch: refs/heads/master
Commit: 3fa8d64e84861bd6023cdab9296b47ab24f581b8
Parents: 07630cc
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:13 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:46 2017 -0800

----------------------------------------------------------------------
 src/status_update_manager/offer_operation.cpp   |   9 +-
 .../status_update_manager_process.hpp           | 149 +++++++++++--------
 2 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index 984969e..8ffce61 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -34,10 +34,11 @@ namespace mesos {
 namespace internal {
 
 OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
-  : process(new StatusUpdateManagerProcess<
-        UUID,
-        OfferOperationStatusUpdateRecord,
-        OfferOperationStatusUpdate>())
+  : process(
+        new StatusUpdateManagerProcess<
+            UUID,
+            OfferOperationStatusUpdateRecord,
+            OfferOperationStatusUpdate>("offer operation status update"))
 {
   spawn(process.get());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 1ac6441..a44551f 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -108,8 +108,9 @@ public:
     State() : streams(), errors(0) {}
   };
 
-  StatusUpdateManagerProcess()
+  StatusUpdateManagerProcess(const std::string& _statusUpdateType)
     : process::ProcessBase(process::ID::generate("status-update-manager")),
+      statusUpdateType(_statusUpdateType),
       paused(false) {}
 
   StatusUpdateManagerProcess(const StatusUpdateManagerProcess& that) = delete;
@@ -144,7 +145,7 @@ public:
       const IDType& streamId,
       bool checkpoint)
   {
-    LOG(INFO) << "Received status update " << update;
+    LOG(INFO) << "Received " << statusUpdateType << " " << update;
 
     if (!streams.contains(streamId)) {
       Try<Nothing> create =
@@ -166,7 +167,7 @@ public:
     // stream that is checkpointable, and vice-versa.
     if (stream->checkpointed() != checkpoint) {
       return process::Failure(
-          "Mismatched checkpoint value for status update " +
+          "Mismatched checkpoint value for " + statusUpdateType + " " +
           stringify(update) + " (expected checkpoint=" +
           stringify(stream->checkpointed()) + " actual checkpoint=" +
           stringify(checkpoint) + ")");
@@ -176,8 +177,8 @@ public:
     // of the stream.
     if (update.has_framework_id() != stream->frameworkId.isSome()) {
       return process::Failure(
-          "Mismatched framework ID for status update " + stringify(update) +
-          " (expected " +
+          "Mismatched framework ID for " + statusUpdateType +
+          " " + stringify(update) + " (expected " +
           (stream->frameworkId.isSome()
              ? stringify(stream->frameworkId.get())
              : "no framework ID") +
@@ -191,7 +192,8 @@ public:
     if (update.has_framework_id() &&
         update.framework_id() != stream->frameworkId.get()) {
       return process::Failure(
-          "Mismatched framework ID for status update " + stringify(update) +
+          "Mismatched framework ID for " + statusUpdateType +
+          " " + stringify(update) +
           " (expected " + stringify(stream->frameworkId.get()) +
           " actual " + stringify(update.framework_id()) + ")");
     }
@@ -238,14 +240,16 @@ public:
       const IDType& streamId,
       const UUID& uuid)
   {
-    LOG(INFO) << "Received status update acknowledgement (UUID: " << uuid << ")"
+    LOG(INFO) << "Received " << statusUpdateType
+              << " acknowledgement (UUID: " << uuid << ")"
               << " for stream " << stringify(streamId);
 
     // This might happen if we haven't completed recovery yet or if the
     // acknowledgement is for a stream that has been cleaned up.
     if (!streams.contains(streamId)) {
       return process::Failure(
-          "Cannot find the status update stream " + stringify(streamId));
+          "Cannot find the " + statusUpdateType + " stream " +
+          stringify(streamId));
     }
 
     StatusUpdateStream* stream = streams[streamId].get();
@@ -258,7 +262,8 @@ public:
     }
 
     if (!result.get()) {
-      return process::Failure("Duplicate status update acknowledgement");
+      return process::Failure(
+          "Duplicate " + statusUpdateType + " acknowledgement");
     }
 
     stream->timeout = None();
@@ -272,8 +277,8 @@ public:
     bool terminated = stream->terminated;
     if (terminated) {
       if (next.isSome()) {
-        LOG(WARNING) << "Acknowledged a terminal status update but updates are"
-                     << " still pending";
+        LOG(WARNING) << "Acknowledged a terminal " << statusUpdateType
+                     << " but updates are still pending";
       }
       cleanupStatusUpdateStream(streamId);
     } else if (!paused && next.isSome()) {
@@ -297,7 +302,7 @@ public:
       const std::list<IDType>& streamIds,
       bool strict)
   {
-    LOG(INFO) << "Recovering status update manager";
+    LOG(INFO) << "Recovering " << statusUpdateType << " manager";
 
     State state;
     foreach (const IDType& streamId, streamIds) {
@@ -306,7 +311,7 @@ public:
 
       if (result.isError()) {
         const std::string message =
-          "Failed to recover status update stream " +
+          "Failed to recover " + statusUpdateType + " stream " +
           stringify(streamId) + ": " + result.error();
         LOG(WARNING) << message;
 
@@ -349,8 +354,8 @@ public:
   // responsible for garbage collection after this method has returned.
   void cleanup(const FrameworkID& frameworkId)
   {
-    LOG(INFO) << "Closing status update streams for framework"
-              << " '" << frameworkId << "'";
+    LOG(INFO) << "Closing " << statusUpdateType << " streams of framework "
+              << frameworkId;
 
     if (frameworkStreams.contains(frameworkId)) {
       foreach (const IDType& streamId,
@@ -362,13 +367,13 @@ public:
 
   void pause()
   {
-    LOG(INFO) << "Pausing sending status updates";
+    LOG(INFO) << "Pausing " << statusUpdateType << " manager";
     paused = true;
   }
 
   void resume()
   {
-    LOG(INFO) << "Resuming sending status updates";
+    LOG(INFO) << "Resuming " << statusUpdateType << " manager";
     paused = false;
 
     foreachpair (const IDType& streamId,
@@ -379,7 +384,7 @@ public:
       if (next.isSome()) {
         const UpdateType& update = next.get();
 
-        LOG(WARNING) << "Sending status update " << update;
+        LOG(INFO) << "Sending " << statusUpdateType << " " << update;
 
         stream->timeout =
           forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
@@ -399,11 +404,12 @@ private:
       const Option<FrameworkID>& frameworkId,
       bool checkpoint)
   {
-    VLOG(1) << "Creating status update stream " << stringify(streamId)
-            << " checkpoint=" << stringify(checkpoint);
+    VLOG(1) << "Creating " << statusUpdateType << " stream "
+            << stringify(streamId) << " checkpoint=" << stringify(checkpoint);
 
     Try<process::Owned<StatusUpdateStream>> stream =
       StatusUpdateStream::create(
+          statusUpdateType,
           streamId,
           frameworkId,
           checkpoint ? Option<std::string>(getPath(streamId)) : None());
@@ -427,12 +433,14 @@ private:
       const IDType& streamId,
       bool strict)
   {
-    VLOG(1) << "Recovering status update stream " << stringify(streamId);
+    VLOG(1) << "Recovering " << statusUpdateType << " stream "
+            << stringify(streamId);
 
     Result<std::pair<
         process::Owned<StatusUpdateStream>,
         typename StatusUpdateStream::State>> result =
-          StatusUpdateStream::recover(streamId, getPath(streamId), strict);
+          StatusUpdateStream::recover(
+              statusUpdateType, streamId, getPath(streamId), strict);
 
     if (result.isError()) {
       return Error(result.error());
@@ -472,10 +480,11 @@ private:
 
   void cleanupStatusUpdateStream(const IDType& streamId)
   {
-    VLOG(1) << "Cleaning up status update stream " << stringify(streamId);
+    VLOG(1) << "Cleaning up " << statusUpdateType << " stream "
+            << stringify(streamId);
 
-    CHECK(streams.contains(streamId)) << "Cannot find the status update stream "
-                                      << stringify(streamId);
+    CHECK(streams.contains(streamId)) << "Cannot find " << statusUpdateType
+                                      << " stream " << stringify(streamId);
 
     StatusUpdateStream* stream = streams[streamId].get();
 
@@ -502,7 +511,7 @@ private:
   {
     CHECK(!paused);
 
-    VLOG(1) << "Forwarding status update " << update;
+    VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
 
     forwardCallback(update);
 
@@ -535,7 +544,7 @@ private:
 
       if (stream->timeout->expired()) {
         const UpdateType& update = stream->pending.front();
-        LOG(WARNING) << "Resending status update " << update;
+        LOG(WARNING) << "Resending " << statusUpdateType << " " << update;
 
         // Bounded exponential backoff.
         Duration duration_ =
@@ -546,6 +555,10 @@ private:
     }
   }
 
+  // Type of status updates handled by the stream, e.g., "offer operation
+  // status update".
+  const std::string statusUpdateType;
+
   lambda::function<void(UpdateType)> forwardCallback;
   lambda::function<const std::string(const IDType&)> getPath;
 
@@ -576,13 +589,15 @@ private:
 
         if (close.isError()) {
           CHECK_SOME(path);
-          LOG(WARNING) << "Failed to close file '" << path.get()
-                       << "': " << close.error();
+          LOG(WARNING) << "Failed to close " << statusUpdateType
+                       << " stream file '" << path.get() << "': "
+                       << close.error();
         }
       }
     }
 
     static Try<process::Owned<StatusUpdateStream>> create(
+        const std::string& statusUpdateType,
         const IDType& streamId,
         const Option<FrameworkID>& frameworkId,
         const Option<std::string>& path)
@@ -591,8 +606,7 @@ private:
 
       if (path.isSome()) {
         if (os::exists(path.get())) {
-          return Error(
-              "The status updates file '" + path.get() + "' already exists.");
+          return Error("The file '" + path.get() + "' already exists");
         }
 
         // Create the base updates directory, if it doesn't exist.
@@ -611,15 +625,14 @@ private:
 
         if (result.isError()) {
           return Error(
-              "Failed to open '" + path.get() +
-              "' for status updates: " + result.error());
+              "Failed to open '" + path.get() + "' : " + result.error());
         }
 
         fd = result.get();
       }
 
       process::Owned<StatusUpdateStream> stream(
-          new StatusUpdateStream(streamId, path, fd));
+          new StatusUpdateStream(statusUpdateType, streamId, path, fd));
 
       stream->frameworkId = frameworkId;
 
@@ -627,8 +640,11 @@ private:
     }
 
 
-    static Result<std::pair<process::Owned<StatusUpdateStream>, State>>
-    recover(const IDType& streamId, const std::string& path, bool strict)
+    static Result<std::pair<process::Owned<StatusUpdateStream>, State>> recover(
+        const std::string& statusUpdateType,
+        const IDType& streamId,
+        const std::string& path,
+        bool strict)
     {
       if (os::exists(Path(path).dirname()) && !os::exists(path)) {
         // This could happen if the process died before it checkpointed any
@@ -640,15 +656,14 @@ private:
       Try<int_fd> fd = os::open(path, O_SYNC | O_RDWR | O_CLOEXEC);
 
       if (fd.isError()) {
-        return Error(
-            "Failed to open status updates stream file '" + path +
-            "': " + fd.error());
+        return Error("Failed to open '" + path + "': " + fd.error());
       }
 
       process::Owned<StatusUpdateStream> stream(
-          new StatusUpdateStream(streamId, path, fd.get()));
+          new StatusUpdateStream(statusUpdateType, streamId, path, fd.get()));
 
-      VLOG(1) << "Replaying updates for stream " << stringify(streamId);
+      VLOG(1) << "Replaying " << statusUpdateType << " stream "
+              << stringify(streamId);
 
       // Read the updates/acknowledgments, building both the stream's in-memory
       // structures and the state object which will be returned.
@@ -674,7 +689,7 @@ private:
 
             if (update.isNone()) {
               return Error(
-                  "Unexpected status update acknowledgment"
+                  "Unexpected " + statusUpdateType + " acknowledgment"
                   " (UUID: " + UUID::fromBytes(record->uuid())->toString() +
                   ") for stream " + stringify(streamId));
             }
@@ -696,23 +711,20 @@ private:
       Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR);
       if (currentPosition.isError()) {
         return Error(
-            "Failed to lseek status updates stream file '" + path +
-            "': " + currentPosition.error());
+            "Failed to lseek file '" + path + "': " + currentPosition.error());
       }
 
       Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get());
 
       if (truncated.isError()) {
         return Error(
-            "Failed to truncate status updates file '" + path +
-            "': " + truncated.error());
+            "Failed to truncate file '" + path + "': " + truncated.error());
       }
 
       // After reading a non-corrupted updates file, `record` should be `none`.
       if (record.isError()) {
         std::string message =
-          "Failed to read status updates file  '" + path +
-          "': " + record.error();
+          "Failed to read file '" + path + "': " + record.error();
 
         if (strict) {
           return Error(message);
@@ -732,8 +744,7 @@ private:
 
         if (removed.isError()) {
           return Error(
-              "Failed to remove status updates file '" + path +
-              "': " + removed.error());
+              "Failed to remove file '" + path + "': " + removed.error());
         }
 
         return None();
@@ -766,14 +777,15 @@ private:
 
       // Check that this status update has not already been acknowledged.
       if (acknowledged.contains(statusUuid.get())) {
-        LOG(WARNING) << "Ignoring status update " << update
+        LOG(WARNING) << "Ignoring " << statusUpdateType << " " << update
                      << " that has already been acknowledged";
         return false;
       }
 
       // Check that this update has not already been received.
       if (received.contains(statusUuid.get())) {
-        LOG(WARNING) << "Ignoring duplicate status update " << update;
+        LOG(WARNING) << "Ignoring duplicate " << statusUpdateType << " "
+                     << update;
         return false;
       }
 
@@ -807,15 +819,15 @@ private:
       // acknowledgments for both the original and the retried update.
       if (_update.isNone()) {
         return Error(
-            "Unexpected status update acknowledgment (UUID: " +
-            statusUuid.toString() + ") for stream " + stringify(streamId));
+            "Unexpected acknowledgment (UUID: " + statusUuid.toString() +
+            ") for " + statusUpdateType + " stream " + stringify(streamId));
       }
 
       const UpdateType& update = _update.get();
 
       if (acknowledged.contains(statusUuid)) {
-        LOG(WARNING) << "Duplicate status update acknowledgment"
-                     << " for update " << update;
+        LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType
+                     << " " << update;
         return false;
       }
 
@@ -830,9 +842,10 @@ private:
       // This might happen if we retried a status update and got back
       // acknowledgments for both the original and the retried update.
       if (statusUuid != updateStatusUuid.get()) {
-        LOG(WARNING) << "Unexpected status update acknowledgement"
-                     << " (received " << statusUuid << ", expecting "
-                     << updateStatusUuid.get() << ") for update " << update;
+        LOG(WARNING) << "Unexpected " << statusUpdateType
+                     << " acknowledgment (received " << statusUuid
+                     << ", expecting " << updateStatusUuid.get() << ") for "
+                     << update;
         return false;
       }
 
@@ -869,10 +882,15 @@ private:
 
   private:
     StatusUpdateStream(
+        const std::string& _statusUpdateType,
         const IDType& _streamId,
         const Option<std::string>& _path,
         Option<int_fd> _fd)
-      : terminated(false), streamId(_streamId), path(_path), fd(_fd) {}
+      : terminated(false),
+        statusUpdateType(_statusUpdateType),
+        streamId(_streamId),
+        path(_path),
+        fd(_fd) {}
 
     // Handles the status update and writes it to disk, if necessary.
     //
@@ -890,8 +908,8 @@ private:
 
       // Checkpoint the update if necessary.
       if (checkpointed()) {
-        LOG(INFO) << "Checkpointing " << type << " for status update "
-                  << update;
+        LOG(INFO) << "Checkpointing " << type << " for " << statusUpdateType
+                  << " " << update;
 
         CHECK_SOME(fd);
 
@@ -914,8 +932,7 @@ private:
         Try<Nothing> write = ::protobuf::write(fd.get(), record);
         if (write.isError()) {
           error =
-            "Failed to write acknowledgement for status update " +
-            stringify(update) + " to '" + path.get() + "': " + write.error();
+            "Failed to write to file '" + path.get() + "': " + write.error();
           return Error(error.get());
         }
       }
@@ -965,6 +982,10 @@ private:
       }
     }
 
+    // Type of status updates handled by the stream, e.g., "offer operation
+    // status update".
+    const std::string& statusUpdateType;
+
     const IDType streamId;
 
     const Option<std::string> path; // File path of the update stream.


[4/7] mesos git commit: Made the agent respond to reconciliation requests for default resources.

Posted by gr...@apache.org.
Made the agent respond to reconciliation requests for default resources.

Review: https://reviews.apache.org/r/64548/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0d5f5c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0d5f5c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0d5f5c4

Branch: refs/heads/master
Commit: d0d5f5c46de7dabb08d54b7c7f8c7f31e184de52
Parents: 01832c0
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:19:19 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 29 ++++++++++++++++++++++++++---
 1 file changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d0d5f5c4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 9584326..d997b42 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3898,14 +3898,37 @@ void Slave::reconcileOfferOperations(
 {
   bool containsResourceProviderOperations = false;
 
-  // TODO(greggomann): Implement reconciliation for offer
-  // operations on the agent's default resources.
   foreach (
       const ReconcileOfferOperationsMessage::Operation& operation,
       message.operations()) {
     if (operation.has_resource_provider_id()) {
       containsResourceProviderOperations = true;
-      break;
+      continue;
+    }
+
+    Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+    CHECK_SOME(operationUuid);
+
+    // The master reconciles when it notices that an operation is missing from
+    // an `UpdateSlaveMessage`. If we cannot find an operation in the agent
+    // state, we send an update to inform the master. If we do find the
+    // operation, then the master and agent state are consistent and we do not
+    // need to do anything.
+    OfferOperation* storedOperation = getOfferOperation(operationUuid.get());
+    if (storedOperation == nullptr) {
+      // For agent default resources, we send best-effort offer operation status
+      // updates to the master. This is satisfactory because a dropped message
+      // would imply a subsequent agent reregistration, after which an
+      // `UpdateSlaveMessage` would be sent with pending operations.
+      OfferOperationStatusUpdate update =
+        protobuf::createOfferOperationStatusUpdate(
+            operationUuid.get(),
+            protobuf::createOfferOperationStatus(OFFER_OPERATION_DROPPED),
+            None(),
+            None(),
+            info.id());
+
+      send(master.get(), update);
     }
   }
 


[6/7] mesos git commit: Avoided returning prematurely in an agent handler.

Posted by gr...@apache.org.
Avoided returning prematurely in an agent handler.

Review: https://reviews.apache.org/r/64504/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01832c02
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01832c02
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01832c02

Branch: refs/heads/master
Commit: 01832c0212c46d769d2023c61f2afd22fd21daa2
Parents: 5c91546
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:19:00 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/01832c02/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5869e73..9584326 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7225,7 +7225,7 @@ void Slave::handleResourceProviderMessage(
                      << update.status().operation_id() << "' (uuid: "
                      << operationUUID->toString() << ") for framework "
                      << update.framework_id();
-        return;
+        break;
       }
 
       updateOfferOperation(operation, update);


[3/7] mesos git commit: Made master reconcile known offer operations with agent.

Posted by gr...@apache.org.
Made master reconcile known offer operations with agent.

In cases where the agent fails over or where an `UpdateSlaveMessage`
races with an `ApplyOfferOperationMessage`, it's possible that the
master knows about an offer operation which is not contained in an
`UpdateSlaveMessage`. In such cases, the master should send a
`ReconcileOfferOperations` message to the agent. The agent will
then respond by sending OFFER_OPERATION_DROPPED status updates for
any operations which it does not know about.

Review: https://reviews.apache.org/r/64464/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c91546b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c91546b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c91546b

Branch: refs/heads/master
Commit: 5c91546babf59a42c4e3fc98d5c712e8d1ddd3d3
Parents: 9d7da9b
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:18:41 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 50 ++++++++++++++++++++++++++++------------------
 1 file changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5c91546b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index efe8b8f..806fbc2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7575,6 +7575,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     }
   }
 
+  ReconcileOfferOperationsMessage reconcile;
+
   // Update master and allocator state.
   foreachpair (
       const Option<ResourceProviderID>& providerId,
@@ -7637,29 +7639,35 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     } else {
       // If this is a known resource provider or agent its total capacity cannot
       // have changed, and it would not know about any non-terminal offer
-      // operations not already known to the master. It might however have not
-      // received an offer operations since the resource provider or agent fell
-      // over before the message could be received. We need to remove these
-      // operations from our state.
-
-      // Reconcile offer operations. This includes recovering
-      // resources in used by operations which did not reach the
-      // agent or resource provider.
+      // operations not already known to the master. However, it might not have
+      // received an offer operation for a couple different reasons:
+      //   - The resource provider or agent could have failed over before the
+      //     operation's `ApplyOfferOperationMessage` could be received.
+      //   - The operation's `ApplyOfferOperationMessage` could have raced with
+      //     this `UpdateSlaveMessage`.
+      //
+      // In both of these cases, we need to reconcile such operations explicitly
+      // with the agent. For operations which the agent or resource provider
+      // does not recognize, an OFFER_OPERATION_DROPPED status update will be
+      // generated and the master will remove the operation from its state upon
+      // receipt of that update.
       if (provider.oldOfferOperations.isSome()) {
         foreachkey (const UUID& uuid, provider.oldOfferOperations.get()) {
           if (provider.newOfferOperations.isNone() ||
               !provider.newOfferOperations->contains(uuid)) {
-            // TODO(bbannier): Instead of simply dropping an operation with
-            // `removeOfferOperation` here we should instead send a `Reconcile`
-            // message with a failed state to the agent so its status update
-            // manager can reliably deliver the operation status to the
-            // framework.
-            LOG(WARNING) << "Dropping known offer operation " << uuid.toString()
-                         << " since it was not present in reconciliation "
-                            "message from agent";
-
-            CHECK(slave->offerOperations.contains(uuid));
-            removeOfferOperation(slave->offerOperations.at(uuid));
+            LOG(WARNING) << "Performing explicit reconciliation with agent for"
+                         << " known offer operation " << uuid.toString()
+                         << " since it was not present in original"
+                         << " reconciliation message from agent";
+
+            ReconcileOfferOperationsMessage::Operation* reconcileOperation =
+              reconcile.add_operations();
+            reconcileOperation->set_operation_uuid(uuid.toBytes());
+
+            if (providerId.isSome()) {
+              reconcileOperation->mutable_resource_provider_id()
+                ->CopyFrom(providerId.get());
+            }
           }
         }
       }
@@ -7679,6 +7687,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     }
   }
 
+  if (reconcile.operations_size() > 0) {
+    send(slave->pid, reconcile);
+  }
+
   // Now update the agent's state and total resources in the allocator.
   allocator->updateSlave(slaveId, slave->info, slave->totalResources);
 


[5/7] mesos git commit: Fixed naming of a variable in `status_update_manager_process.hpp`.

Posted by gr...@apache.org.
Fixed naming of a variable in `status_update_manager_process.hpp`.

Rename the variable `_update` to `update_` to make the name consistent
with the style guide.

Review: https://reviews.apache.org/r/64473/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e70097c5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e70097c5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e70097c5

Branch: refs/heads/master
Commit: e70097c55bf63c23eedf3151047c40a1b698240e
Parents: 3fa8d64
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:15 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800

----------------------------------------------------------------------
 .../status_update_manager_process.hpp                     | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e70097c5/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index a44551f..935f754 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -810,20 +810,20 @@ private:
       }
 
       // Get the corresponding update for this ACK.
-      const Result<UpdateType>& _update = next();
-      if (_update.isError()) {
-        return Error(_update.error());
+      const Result<UpdateType>& update_ = next();
+      if (update_.isError()) {
+        return Error(update_.error());
       }
 
       // This might happen if we retried a status update and got back
       // acknowledgments for both the original and the retried update.
-      if (_update.isNone()) {
+      if (update_.isNone()) {
         return Error(
             "Unexpected acknowledgment (UUID: " + statusUuid.toString() +
             ") for " + statusUpdateType + " stream " + stringify(streamId));
       }
 
-      const UpdateType& update = _update.get();
+      const UpdateType& update = update_.get();
 
       if (acknowledged.contains(statusUuid)) {
         LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType