You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/14 01:11:06 UTC

[1/3] mesos git commit: Made `StatusUpdateManagerProcess` fill in the latest status.

Repository: mesos
Updated Branches:
  refs/heads/master aaf043382 -> c3c070094


Made `StatusUpdateManagerProcess` fill in the latest status.

Review: https://reviews.apache.org/r/64521/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c728f8e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c728f8e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c728f8e7

Branch: refs/heads/master
Commit: c728f8e73537bf6f269b9a1cddd06d92dc890dd0
Parents: aaf0433
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Dec 13 16:02:32 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:56 2017 -0800

----------------------------------------------------------------------
 .../status_update_manager_process.hpp           |  44 +++---
 ...er_operation_status_update_manager_tests.cpp | 148 ++++++++++++++++---
 2 files changed, 157 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 8c9e06f..1536bcc 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -165,6 +165,11 @@ public:
     CHECK(streams.contains(streamId));
     StatusUpdateStream* stream = streams[streamId].get();
 
+    if (update.has_latest_status()) {
+      return process::Failure(
+          "Expected " + statusUpdateType + " to not contain 'latest_status'");
+    }
+
     // Verify that we didn't get a non-checkpointable update for a
     // stream that is checkpointable, and vice-versa.
     if (stream->checkpointed() != checkpoint) {
@@ -223,7 +228,7 @@ public:
 
       CHECK_SOME(next);
       stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     return Nothing();
@@ -286,7 +291,7 @@ public:
     } else if (!paused && next.isSome()) {
       // Forward the next queued status update.
       stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     return !terminated;
@@ -378,9 +383,7 @@ public:
     LOG(INFO) << "Resuming " << statusUpdateType << " manager";
     paused = false;
 
-    foreachpair (const IDType& streamId,
-                 process::Owned<StatusUpdateStream>& stream,
-                 streams) {
+    foreachvalue (process::Owned<StatusUpdateStream>& stream, streams) {
       const Result<UpdateType>& next = stream->next();
 
       if (next.isSome()) {
@@ -388,8 +391,8 @@ public:
 
         LOG(INFO) << "Sending " << statusUpdateType << " " << update;
 
-        stream->timeout =
-          forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+        stream->timeout = forward(
+            stream.get(), update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
       }
     }
   }
@@ -471,8 +474,8 @@ private:
 
     if (!paused && next.isSome()) {
       // Forward the next queued status update.
-      stream->timeout =
-        forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+      stream->timeout = forward(
+          stream.get(), next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
     }
 
     streams[streamId] = std::move(stream);
@@ -507,11 +510,18 @@ private:
   // Forwards the status update and starts a timer based on the `duration` to
   // check for ACK.
   process::Timeout forward(
-      const IDType& streamId,
-      const UpdateType& update,
+      StatusUpdateStream* stream,
+      const UpdateType& _update,
       const Duration& duration)
   {
     CHECK(!paused);
+    CHECK(!_update.has_latest_status());
+    CHECK_NOTNULL(stream);
+
+    UpdateType update(_update);
+    update.mutable_latest_status()->CopyFrom(
+        stream->pending.empty() ? _update.status()
+                                : stream->pending.back().status());
 
     VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
 
@@ -526,7 +536,7 @@ private:
             CheckpointType,
             UpdateType>>::self(),
         &StatusUpdateManagerProcess::timeout,
-        streamId,
+        stream->streamId,
         duration)
       .timeout();
   }
@@ -552,7 +562,7 @@ private:
         Duration duration_ =
           std::min(duration * 2, slave::STATUS_UPDATE_RETRY_INTERVAL_MAX);
 
-        stream->timeout = forward(streamId, update, duration_);
+        stream->timeout = forward(stream, update, duration_);
       }
     }
   }
@@ -877,6 +887,8 @@ private:
     // Returns `true` if the stream is checkpointed, `false` otherwise.
     bool checkpointed() { return path.isSome(); }
 
+    const IDType streamId;
+
     bool terminated;
     Option<FrameworkID> frameworkId;
     Option<process::Timeout> timeout; // Timeout for resending status update.
@@ -888,9 +900,9 @@ private:
         const IDType& _streamId,
         const Option<std::string>& _path,
         Option<int_fd> _fd)
-      : terminated(false),
+      : streamId(_streamId),
+        terminated(false),
         statusUpdateType(_statusUpdateType),
-        streamId(_streamId),
         path(_path),
         fd(_fd) {}
 
@@ -988,8 +1000,6 @@ private:
     // status update".
     const std::string& statusUpdateType;
 
-    const IDType streamId;
-
     const Option<std::string> path; // File path of the update stream.
     const Option<int_fd> fd; // File descriptor to the update stream.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/tests/offer_operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp b/src/tests/offer_operation_status_update_manager_tests.cpp
index a5327d3..e56fb0e 100644
--- a/src/tests/offer_operation_status_update_manager_tests.cpp
+++ b/src/tests/offer_operation_status_update_manager_tests.cpp
@@ -148,8 +148,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -180,8 +183,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
   // should return `true`.
@@ -214,8 +220,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   EXPECT_FALSE(forwardedStatusUpdate2.isReady());
 
@@ -224,7 +233,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
   Clock::settle();
 
   // Verify that the status update is forwarded again.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -262,8 +271,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Cleanup the framework.
   statusUpdateManager->cleanup(frameworkId);
@@ -297,8 +309,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   resetStatusUpdateManager();
 
@@ -329,7 +344,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
   EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
 
   // Check that the status update is resent.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 }
 
 
@@ -352,8 +367,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
   // Send a non-checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Verify that the stream file is NOT created.
   EXPECT_TRUE(!os::exists(getPath(operationUuid)));
@@ -388,8 +406,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -442,8 +463,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyDirectory)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -490,8 +514,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverTerminatedStream)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a terminal update, so `acknowledgement`
   // should return `false`.
@@ -545,8 +572,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
   // should return `true`.
@@ -582,8 +612,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
   // should return `true`.
@@ -624,8 +657,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAck)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
   // should return `true`.
@@ -662,8 +698,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   // Acknowledge the update, this is a non-terminal update, so `acknowledgement`
   // should return `true`.
@@ -707,8 +746,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
 
   resetStatusUpdateManager();
 
@@ -747,7 +789,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
   EXPECT_EQ(statusUpdate, recoveredUpdate.get());
 
   // Check that the status update is resent.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
 }
 
 
@@ -770,8 +812,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
   // Send a checkpointed offer operation status update.
   AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
 
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
   // Verify that the status update is forwarded.
-  AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
 
   resetStatusUpdateManager();
 
@@ -793,6 +838,73 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
   AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
 }
 
+
+// This test verifies that the status update manager correctly fills in the
+// latest status when (re)sending status updates.
+TEST_F(OfferOperationStatusUpdateManagerTest, UpdateLatestWhenResending)
+{
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+  Future<OfferOperationStatusUpdate> forwardedStatusUpdate3;
+  EXPECT_CALL(statusUpdateProcessor, update(_))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
+    .WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
+
+  const UUID operationUuid = UUID::random();
+
+  const UUID statusUuid1 = UUID::random();
+  OfferOperationStatusUpdate statusUpdate1 = createOfferOperationStatusUpdate(
+      statusUuid1, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
+
+  // Send a checkpointed offer operation status update.
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
+
+  // The status update manager should fill in the `latest_status` field with the
+  // status update we just sent.
+  OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate1);
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+      statusUpdate1.status());
+
+  // Verify that the status update is forwarded.
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+  EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+  // Send another status update.
+  const UUID statusUuid2 = UUID::random();
+  OfferOperationStatusUpdate statusUpdate2 = createOfferOperationStatusUpdate(
+      statusUuid2, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
+  AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
+
+  // Advance the clock to trigger a retry of the first update.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  // Now that another status update was sent, the status update manager should
+  // fill in the `latest_status` field with this new status update.
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+      statusUpdate2.status());
+
+  // Verify that the status update is forwarded again.
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+
+  EXPECT_FALSE(forwardedStatusUpdate3.isReady());
+
+  // Acknowledge the first update, it is NOT a terminal update, so
+  // `acknowledgement` should return `true`. The status update manager
+  // should now send the second status update.
+  AWAIT_EXPECT_TRUE(
+      statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
+
+  // The status update manager should then forward the latest status update.
+  expectedStatusUpdate = statusUpdate2;
+  expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+    statusUpdate2.status());
+
+  AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[3/3] mesos git commit: Fixed a typo in 'agent.proto'.

Posted by gr...@apache.org.
Fixed a typo in 'agent.proto'.

Review: https://reviews.apache.org/r/64560/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3c07009
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3c07009
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3c07009

Branch: refs/heads/master
Commit: c3c07009451197a0348df02cdebcb9a1a293b131
Parents: 3100e9a
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Dec 13 16:03:06 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:57 2017 -0800

----------------------------------------------------------------------
 include/mesos/agent/agent.proto    | 2 +-
 include/mesos/v1/agent/agent.proto | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c3c07009/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 6fcca6a..c3b1f22 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -310,7 +310,7 @@ message Call {
   // config file that describes a resource provider of the specified
   // type and name in the resource provider config directory. The
   // 'info.id' field should not be set. The resource provider will be
-  // relaunched asynchoronously to reflect the changes in its config.
+  // relaunched asynchronously to reflect the changes in its config.
   // Note that only config files that exist at agent startup can be
   // updated though this call.
   //

http://git-wip-us.apache.org/repos/asf/mesos/blob/c3c07009/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 57c3518..280a3b0 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -310,7 +310,7 @@ message Call {
   // config file that describes a resource provider of the specified
   // type and name in the resource provider config directory. The
   // 'info.id' field should not be set. The resource provider will be
-  // relaunched asynchoronously to reflect the changes in its config.
+  // relaunched asynchronously to reflect the changes in its config.
   // Note that only config files that exist at agent startup can be
   // updated though this call.
   //


[2/3] mesos git commit: Initialized offer operation status update manager in SLRP.

Posted by gr...@apache.org.
Initialized offer operation status update manager in SLRP.

This patch adds an agent filesystem layout for checkpointing offer
operation status updates for resource providers, and initialized
a status update manager in the storage local resource provider.

Review: https://reviews.apache.org/r/64475/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3100e9aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3100e9aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3100e9aa

Branch: refs/heads/master
Commit: 3100e9aa0ac9b6bcc92643b145e2730fc862ea39
Parents: c728f8e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Dec 13 16:02:33 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:57 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 130 +++++++++++++++++++-----
 src/slave/paths.cpp                        |  54 ++++++++++
 src/slave/paths.hpp                        |  23 +++++
 3 files changed, 179 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index e806f44..2fd4a3b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -66,6 +66,8 @@
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
 
+#include "status_update_manager/offer_operation.hpp"
+
 namespace http = process::http;
 
 using std::accumulate;
@@ -327,6 +329,7 @@ private:
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
+  Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
   Future<Nothing> reconcile();
 
@@ -359,8 +362,8 @@ private:
   Future<Nothing> deleteVolume(const string& volumeId);
 
   // Applies the offer operation. Conventional operations will be
-  // synchoronusly applied.
-  Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+  // synchronously applied.
+  Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
       const Resource& resource,
@@ -375,9 +378,15 @@ private:
       const Try<vector<ResourceConversion>>& conversions);
 
   void checkpointResourceProviderState();
-  void sendResourceProviderStateUpdate();
   void checkpointVolumeState(const string& volumeId);
 
+  void sendResourceProviderStateUpdate();
+
+  // NOTE: This is a callback for the status update manager and should
+  // not be called directly.
+  void sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate);
+
   enum State
   {
     RECOVERING,
@@ -400,6 +409,7 @@ private:
   hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
+  OfferOperationStatusUpdateManager statusUpdateManager;
 
   ContainerID controllerContainerId;
   ContainerID nodeContainerId;
@@ -611,15 +621,15 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
       // We replay all pending operations here, so that if a volume is
       // actually created before the last failover, it will be reflected
       // in the updated total resources before we do the reconciliation.
-      // NOTE: `applyOfferOperation` will remove the applied operation
+      // NOTE: `_applyOfferOperation` will remove the applied operation
       // from the list of pending operations, so we make a copy of keys
       // here.
       foreach (const UUID& uuid, pendingOperations.keys()) {
-        applyOfferOperation(uuid)
+        _applyOfferOperation(uuid)
           .onAny(defer(self(), [=](const Future<Nothing>& future) {
             if (!future.isReady()) {
               LOG(ERROR)
-                << "Failed to apply operation " << uuid << ": "
+                << "Failed to apply offer operation with UUID " << uuid << ": "
                 << (future.isFailed() ? future.failure() : "future discarded");
             }
           }));
@@ -841,6 +851,27 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+{
+  CHECK(info.has_id());
+
+  const string resourceProviderDir = slave::paths::getResourceProviderPath(
+      metaDir, slaveId, info.type(), info.name(), info.id());
+
+  statusUpdateManager.initialize(
+      defer(self(), &Self::sendOfferOperationStatusUpdate, lambda::_1),
+      std::bind(
+          &slave::paths::getOfferOperationUpdatesPath,
+          resourceProviderDir,
+          lambda::_1));
+
+  statusUpdateManager.pause();
+
+  // TODO(chhsiao): Recover status updates.
+  return Nothing();
+}
+
+
 void StorageLocalResourceProviderProcess::doReliableRegistration()
 {
   if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -872,7 +903,8 @@ void StorageLocalResourceProviderProcess::doReliableRegistration()
 
 Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
 {
-  return importResources()
+  return recoverStatusUpdates()
+    .then(defer(self(), &Self::importResources))
     .then(defer(self(), [=](Resources importedResources) {
       // NODE: If a resource in the checkpointed total resources is
       // missing in the imported resources, we will still keep it if it
@@ -945,6 +977,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
       }
 
       sendResourceProviderStateUpdate();
+      statusUpdateManager.resume();
 
       state = READY;
 
@@ -1005,7 +1038,7 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
   pendingOperations[uuid.get()] = operation;
   checkpointResourceProviderState();
 
-  applyOfferOperation(uuid.get())
+  _applyOfferOperation(uuid.get())
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       if (!future.isReady()) {
         LOG(ERROR)
@@ -1942,8 +1975,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
 
           if (volumes.contains(volumeInfo.id())) {
             // The resource provider failed over after the last
-            // `CreateVolume` call, but before the operation status
-            // was checkpointed.
+            // `CreateVolume` call, but before the offer operation
+            // status was checkpointed.
             CHECK_EQ(csi::state::VolumeState::CREATED,
                      volumes.at(volumeInfo.id()).state.state());
           } else {
@@ -2027,7 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
     }
   } else {
     // The resource provider failed over after the last `DeleteVolume`
-    // call, but before the operation status was checkpointed.
+    // call, but before the offer operation status was checkpointed.
     CHECK(!os::exists(volumePath));
   }
 
@@ -2038,7 +2071,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     const UUID& operationUuid)
 {
   Future<vector<ResourceConversion>> conversions;
@@ -2205,7 +2238,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     created = resource.disk().source().id();
   } else {
     // We use the operation UUID as the name of the volume, so the same
-    // operation will create the same volume after recovery.
+    // offer operation will create the same volume after recovery.
     // TODO(chhsiao): Call `CreateVolume` sequentially with other create
     // or delete operations.
     // TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
@@ -2389,8 +2422,8 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
 
   auto err = [](const UUID& operationUuid, const string& message) {
     LOG(ERROR)
-      << "Failed to send status update for offer operation " << operationUuid
-      << ": " << message;
+      << "Failed to send status update for offer operation with UUID "
+      << operationUuid << ": " << message;
   };
 
   driver->send(evolve(call))
@@ -2426,8 +2459,28 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, state))
-    << "Failed to checkpoint resource provider state to '" << statePath << "'";
+  Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint resource provider state to '" << statePath << "': "
+    << checkpoint.error();
+}
+
+
+void StorageLocalResourceProviderProcess::checkpointVolumeState(
+    const string& volumeId)
+{
+  const string statePath = csi::paths::getVolumeStatePath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      volumeId);
+
+  Try<Nothing> checkpoint =
+    slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+
+  CHECK_SOME(checkpoint)
+    << "Failed to checkpoint volume state to '" << statePath << "':"
+    << checkpoint.error();
 }
 
 
@@ -2443,8 +2496,8 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
                const Event::ApplyOfferOperation& operation,
                pendingOperations) {
     // TODO(chhsiao): Maintain a list of terminated but unacknowledged
-    // offer operations in memory and reconstruc that during recovery
-    // by querying status update manager.
+    // offer operations in memory and reconstruct it during recovery
+    // by querying the status update manager.
     update->add_operations()->CopyFrom(
         protobuf::createOfferOperation(
             operation.info(),
@@ -2475,17 +2528,38 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 }
 
 
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
-    const string& volumeId)
+void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate(
+      const OfferOperationStatusUpdate& statusUpdate)
 {
-  const string statePath = csi::paths::getVolumeStatePath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      volumeId);
+  Call call;
+  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+  call.mutable_resource_provider_id()->CopyFrom(info.id());
 
-  CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state))
-    << "Failed to checkpoint volume state to '" << statePath << "'";
+  Call::UpdateOfferOperationStatus* update =
+    call.mutable_update_offer_operation_status();
+  update->set_operation_uuid(statusUpdate.operation_uuid());
+  update->mutable_status()->CopyFrom(statusUpdate.status());
+
+  if (statusUpdate.has_framework_id()) {
+    update->mutable_framework_id()->CopyFrom(statusUpdate.framework_id());
+  }
+
+  // The latest status should have been set by the status update manager.
+  CHECK(statusUpdate.has_latest_status());
+  update->mutable_latest_status()->CopyFrom(statusUpdate.latest_status());
+
+  auto err = [](const UUID& uuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to send status update for offer operation with UUID " << uuid
+      << ": " << message;
+  };
+
+  Try<UUID> uuid = UUID::fromBytes(statusUpdate.operation_uuid());
+  CHECK_SOME(uuid);
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, uuid.get(), lambda::_1))
+    .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index b8004e7..f9f0c78 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -65,6 +65,7 @@ const char TASK_UPDATES_FILE[] = "task.updates";
 const char RESOURCES_INFO_FILE[] = "resources.info";
 const char RESOURCES_TARGET_FILE[] = "resources.target";
 const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state";
+const char OFFER_OPERATION_UPDATES_FILE[] = "operation.updates";
 
 
 const char CONTAINERS_DIR[] = "containers";
@@ -75,6 +76,7 @@ const char EXECUTORS_DIR[] = "executors";
 const char EXECUTOR_RUNS_DIR[] = "runs";
 const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
 const char RESOURCE_PROVIDERS_DIR[] = "resource_providers";
+const char OFFER_OPERATIONS_DIR[] = "operations";
 
 
 Try<ExecutorRunPath> parseExecutorRunPath(
@@ -545,6 +547,58 @@ string getLatestResourceProviderPath(
 }
 
 
+Try<list<string>> getOfferOperationPaths(
+    const string& rootDir)
+{
+  return fs::list(path::join(rootDir, OFFER_OPERATIONS_DIR, "*"));
+}
+
+
+string getOfferOperationPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(rootDir, OFFER_OPERATIONS_DIR, operationUuid.toString());
+}
+
+
+Try<UUID> parseOfferOperationPath(
+    const string& rootDir,
+    const string& dir)
+{
+  // TODO(chhsiao): Consider using `<regex>`, which requires GCC 4.9+.
+
+  // Make sure there's a separator at the end of the prefix so that we
+  // don't accidently slice off part of a directory.
+  const string prefix = path::join(rootDir, OFFER_OPERATIONS_DIR, "");
+
+  if (!strings::startsWith(dir, prefix)) {
+    return Error(
+        "Directory '" + dir + "' does not fall under operations directory '" +
+        prefix + "'");
+  }
+
+  Try<UUID> operationUuid = UUID::fromString(Path(dir).basename());
+  if (operationUuid.isError()) {
+    return Error(
+        "Could not decode offer operation UUID from string '" +
+        Path(dir).basename() + "': " + operationUuid.error());
+  }
+
+  return operationUuid.get();
+}
+
+
+string getOfferOperationUpdatesPath(
+    const string& rootDir,
+    const UUID& operationUuid)
+{
+  return path::join(
+      getOfferOperationPath(rootDir, operationUuid),
+      OFFER_OPERATION_UPDATES_FILE);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d645d87..bae68d0 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -23,6 +23,7 @@
 #include <mesos/mesos.hpp>
 
 #include <stout/try.hpp>
+#include <stout/uuid.hpp>
 
 namespace mesos {
 namespace internal {
@@ -79,6 +80,9 @@ namespace paths {
 //   |           |           |-- latest (symlink)
 //   |           |           |-- <resource_provider_id>
 //   |           |               |-- resource_provider.state
+//   |           |               |-- operations
+//   |           |                   |-- <operation_uuid>
+//   |           |                       |-- operation.updates
 //   |           |-- frameworks
 //   |               |-- <framework_id>
 //   |                   |-- framework.info
@@ -349,6 +353,25 @@ std::string getLatestResourceProviderPath(
     const std::string& resourceProviderName);
 
 
+Try<std::list<std::string>> getOfferOperationPaths(
+    const std::string& rootDir);
+
+
+std::string getOfferOperationPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
+Try<UUID> parseOfferOperationPath(
+    const std::string& rootDir,
+    const std::string& dir);
+
+
+std::string getOfferOperationUpdatesPath(
+    const std::string& rootDir,
+    const UUID& operationUuid);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);