You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/14 01:11:06 UTC
[1/3] mesos git commit: Made `StatusUpdateManagerProcess` fill in the
latest status.
Repository: mesos
Updated Branches:
refs/heads/master aaf043382 -> c3c070094
Made `StatusUpdateManagerProcess` fill in the latest status.
Review: https://reviews.apache.org/r/64521/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c728f8e7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c728f8e7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c728f8e7
Branch: refs/heads/master
Commit: c728f8e73537bf6f269b9a1cddd06d92dc890dd0
Parents: aaf0433
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Dec 13 16:02:32 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:56 2017 -0800
----------------------------------------------------------------------
.../status_update_manager_process.hpp | 44 +++---
...er_operation_status_update_manager_tests.cpp | 148 ++++++++++++++++---
2 files changed, 157 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 8c9e06f..1536bcc 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -165,6 +165,11 @@ public:
CHECK(streams.contains(streamId));
StatusUpdateStream* stream = streams[streamId].get();
+ if (update.has_latest_status()) {
+ return process::Failure(
+ "Expected " + statusUpdateType + " to not contain 'latest_status'");
+ }
+
// Verify that we didn't get a non-checkpointable update for a
// stream that is checkpointable, and vice-versa.
if (stream->checkpointed() != checkpoint) {
@@ -223,7 +228,7 @@ public:
CHECK_SOME(next);
stream->timeout =
- forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return Nothing();
@@ -286,7 +291,7 @@ public:
} else if (!paused && next.isSome()) {
// Forward the next queued status update.
stream->timeout =
- forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return !terminated;
@@ -378,9 +383,7 @@ public:
LOG(INFO) << "Resuming " << statusUpdateType << " manager";
paused = false;
- foreachpair (const IDType& streamId,
- process::Owned<StatusUpdateStream>& stream,
- streams) {
+ foreachvalue (process::Owned<StatusUpdateStream>& stream, streams) {
const Result<UpdateType>& next = stream->next();
if (next.isSome()) {
@@ -388,8 +391,8 @@ public:
LOG(INFO) << "Sending " << statusUpdateType << " " << update;
- stream->timeout =
- forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ stream->timeout = forward(
+ stream.get(), update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
}
@@ -471,8 +474,8 @@ private:
if (!paused && next.isSome()) {
// Forward the next queued status update.
- stream->timeout =
- forward(streamId, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ stream->timeout = forward(
+ stream.get(), next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
streams[streamId] = std::move(stream);
@@ -507,11 +510,18 @@ private:
// Forwards the status update and starts a timer based on the `duration` to
// check for ACK.
process::Timeout forward(
- const IDType& streamId,
- const UpdateType& update,
+ StatusUpdateStream* stream,
+ const UpdateType& _update,
const Duration& duration)
{
CHECK(!paused);
+ CHECK(!_update.has_latest_status());
+ CHECK_NOTNULL(stream);
+
+ UpdateType update(_update);
+ update.mutable_latest_status()->CopyFrom(
+ stream->pending.empty() ? _update.status()
+ : stream->pending.back().status());
VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
@@ -526,7 +536,7 @@ private:
CheckpointType,
UpdateType>>::self(),
&StatusUpdateManagerProcess::timeout,
- streamId,
+ stream->streamId,
duration)
.timeout();
}
@@ -552,7 +562,7 @@ private:
Duration duration_ =
std::min(duration * 2, slave::STATUS_UPDATE_RETRY_INTERVAL_MAX);
- stream->timeout = forward(streamId, update, duration_);
+ stream->timeout = forward(stream, update, duration_);
}
}
}
@@ -877,6 +887,8 @@ private:
// Returns `true` if the stream is checkpointed, `false` otherwise.
bool checkpointed() { return path.isSome(); }
+ const IDType streamId;
+
bool terminated;
Option<FrameworkID> frameworkId;
Option<process::Timeout> timeout; // Timeout for resending status update.
@@ -888,9 +900,9 @@ private:
const IDType& _streamId,
const Option<std::string>& _path,
Option<int_fd> _fd)
- : terminated(false),
+ : streamId(_streamId),
+ terminated(false),
statusUpdateType(_statusUpdateType),
- streamId(_streamId),
path(_path),
fd(_fd) {}
@@ -988,8 +1000,6 @@ private:
// status update".
const std::string& statusUpdateType;
- const IDType streamId;
-
const Option<std::string> path; // File path of the update stream.
const Option<int_fd> fd; // File descriptor to the update stream.
http://git-wip-us.apache.org/repos/asf/mesos/blob/c728f8e7/src/tests/offer_operation_status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/offer_operation_status_update_manager_tests.cpp b/src/tests/offer_operation_status_update_manager_tests.cpp
index a5327d3..e56fb0e 100644
--- a/src/tests/offer_operation_status_update_manager_tests.cpp
+++ b/src/tests/offer_operation_status_update_manager_tests.cpp
@@ -148,8 +148,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAck)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
@@ -180,8 +183,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
@@ -214,8 +220,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
EXPECT_FALSE(forwardedStatusUpdate2.isReady());
@@ -224,7 +233,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, ResendUnacknowledged)
Clock::settle();
// Verify that the status update is forwarded again.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
@@ -262,8 +271,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, Cleanup)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Cleanup the framework.
statusUpdateManager->cleanup(frameworkId);
@@ -297,8 +309,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
resetStatusUpdateManager();
@@ -329,7 +344,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverCheckpointedStream)
EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
// Check that the status update is resent.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
}
@@ -352,8 +367,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
// Send a non-checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Verify that the stream file is NOT created.
EXPECT_TRUE(!os::exists(getPath(operationUuid)));
@@ -388,8 +406,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyFile)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
@@ -442,8 +463,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverEmptyDirectory)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
@@ -490,8 +514,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RecoverTerminatedStream)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
@@ -545,8 +572,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
@@ -582,8 +612,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
@@ -624,8 +657,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAck)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
@@ -662,8 +698,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
@@ -707,8 +746,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate1);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
resetStatusUpdateManager();
@@ -747,7 +789,7 @@ TEST_F(OfferOperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
EXPECT_EQ(statusUpdate, recoveredUpdate.get());
// Check that the status update is resent.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate2);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
}
@@ -770,8 +812,11 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
// Send a checkpointed offer operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
+
// Verify that the status update is forwarded.
- AWAIT_EXPECT_EQ(statusUpdate, forwardedStatusUpdate);
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
@@ -793,6 +838,73 @@ TEST_F(OfferOperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
}
+
+// This test verifies that the status update manager correctly fills in the
+// latest status when (re)sending status updates.
+TEST_F(OfferOperationStatusUpdateManagerTest, UpdateLatestWhenResending)
+{
+ Future<OfferOperationStatusUpdate> forwardedStatusUpdate1;
+ Future<OfferOperationStatusUpdate> forwardedStatusUpdate2;
+ Future<OfferOperationStatusUpdate> forwardedStatusUpdate3;
+ EXPECT_CALL(statusUpdateProcessor, update(_))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
+ .WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
+
+ const UUID operationUuid = UUID::random();
+
+ const UUID statusUuid1 = UUID::random();
+ OfferOperationStatusUpdate statusUpdate1 = createOfferOperationStatusUpdate(
+ statusUuid1, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
+
+ // Send a checkpointed offer operation status update.
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
+
+ // The status update manager should fill in the `latest_status` field with the
+ // status update we just sent.
+ OfferOperationStatusUpdate expectedStatusUpdate(statusUpdate1);
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate1.status());
+
+ // Verify that the status update is forwarded.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
+
+ EXPECT_FALSE(forwardedStatusUpdate2.isReady());
+
+ // Send another status update.
+ const UUID statusUuid2 = UUID::random();
+ OfferOperationStatusUpdate statusUpdate2 = createOfferOperationStatusUpdate(
+ statusUuid2, operationUuid, OfferOperationState::OFFER_OPERATION_PENDING);
+ AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
+
+ // Advance the clock to trigger a retry of the first update.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Now that another status update was sent, the status update manager should
+ // fill in the `latest_status` field with this new status update.
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate2.status());
+
+ // Verify that the status update is forwarded again.
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
+
+ EXPECT_FALSE(forwardedStatusUpdate3.isReady());
+
+ // Acknowledge the first update, it is NOT a terminal update, so
+ // `acknowledgement` should return `true`. The status update manager
+ // should now send the second status update.
+ AWAIT_EXPECT_TRUE(
+ statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
+
+ // The status update manager should then forward the latest status update.
+ expectedStatusUpdate = statusUpdate2;
+ expectedStatusUpdate.mutable_latest_status()->CopyFrom(
+ statusUpdate2.status());
+
+ AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[3/3] mesos git commit: Fixed a typo in 'agent.proto'.
Posted by gr...@apache.org.
Fixed a typo in 'agent.proto'.
Review: https://reviews.apache.org/r/64560/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3c07009
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3c07009
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3c07009
Branch: refs/heads/master
Commit: c3c07009451197a0348df02cdebcb9a1a293b131
Parents: 3100e9a
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Wed Dec 13 16:03:06 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:57 2017 -0800
----------------------------------------------------------------------
include/mesos/agent/agent.proto | 2 +-
include/mesos/v1/agent/agent.proto | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3c07009/include/mesos/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/agent/agent.proto b/include/mesos/agent/agent.proto
index 6fcca6a..c3b1f22 100644
--- a/include/mesos/agent/agent.proto
+++ b/include/mesos/agent/agent.proto
@@ -310,7 +310,7 @@ message Call {
// config file that describes a resource provider of the specified
// type and name in the resource provider config directory. The
// 'info.id' field should not be set. The resource provider will be
- // relaunched asynchoronously to reflect the changes in its config.
+ // relaunched asynchronously to reflect the changes in its config.
// Note that only config files that exist at agent startup can be
// updated though this call.
//
http://git-wip-us.apache.org/repos/asf/mesos/blob/c3c07009/include/mesos/v1/agent/agent.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/agent/agent.proto b/include/mesos/v1/agent/agent.proto
index 57c3518..280a3b0 100644
--- a/include/mesos/v1/agent/agent.proto
+++ b/include/mesos/v1/agent/agent.proto
@@ -310,7 +310,7 @@ message Call {
// config file that describes a resource provider of the specified
// type and name in the resource provider config directory. The
// 'info.id' field should not be set. The resource provider will be
- // relaunched asynchoronously to reflect the changes in its config.
+ // relaunched asynchronously to reflect the changes in its config.
// Note that only config files that exist at agent startup can be
// updated though this call.
//
[2/3] mesos git commit: Initialized offer operation status update
manager in SLRP.
Posted by gr...@apache.org.
Initialized offer operation status update manager in SLRP.
This patch adds an agent filesystem layout for checkpointing offer
operation status updates for resource providers, and initialized
a status update manager in the storage local resource provider.
Review: https://reviews.apache.org/r/64475/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3100e9aa
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3100e9aa
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3100e9aa
Branch: refs/heads/master
Commit: 3100e9aa0ac9b6bcc92643b145e2730fc862ea39
Parents: c728f8e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Dec 13 16:02:33 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Dec 13 17:05:57 2017 -0800
----------------------------------------------------------------------
src/resource_provider/storage/provider.cpp | 130 +++++++++++++++++++-----
src/slave/paths.cpp | 54 ++++++++++
src/slave/paths.hpp | 23 +++++
3 files changed, 179 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index e806f44..2fd4a3b 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -66,6 +66,8 @@
#include "slave/paths.hpp"
#include "slave/state.hpp"
+#include "status_update_manager/offer_operation.hpp"
+
namespace http = process::http;
using std::accumulate;
@@ -327,6 +329,7 @@ private:
Future<Nothing> recover();
Future<Nothing> recoverServices();
Future<Nothing> recoverVolumes();
+ Future<Nothing> recoverStatusUpdates();
void doReliableRegistration();
Future<Nothing> reconcile();
@@ -359,8 +362,8 @@ private:
Future<Nothing> deleteVolume(const string& volumeId);
// Applies the offer operation. Conventional operations will be
- // synchoronusly applied.
- Future<Nothing> applyOfferOperation(const UUID& operationUuid);
+ // synchronously applied.
+ Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
const Resource& resource,
@@ -375,9 +378,15 @@ private:
const Try<vector<ResourceConversion>>& conversions);
void checkpointResourceProviderState();
- void sendResourceProviderStateUpdate();
void checkpointVolumeState(const string& volumeId);
+ void sendResourceProviderStateUpdate();
+
+ // NOTE: This is a callback for the status update manager and should
+ // not be called directly.
+ void sendOfferOperationStatusUpdate(
+ const OfferOperationStatusUpdate& statusUpdate);
+
enum State
{
RECOVERING,
@@ -400,6 +409,7 @@ private:
hashmap<string, ProfileData> profiles;
process::grpc::client::Runtime runtime;
Owned<v1::resource_provider::Driver> driver;
+ OfferOperationStatusUpdateManager statusUpdateManager;
ContainerID controllerContainerId;
ContainerID nodeContainerId;
@@ -611,15 +621,15 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
// We replay all pending operations here, so that if a volume is
// actually created before the last failover, it will be reflected
// in the updated total resources before we do the reconciliation.
- // NOTE: `applyOfferOperation` will remove the applied operation
+ // NOTE: `_applyOfferOperation` will remove the applied operation
// from the list of pending operations, so we make a copy of keys
// here.
foreach (const UUID& uuid, pendingOperations.keys()) {
- applyOfferOperation(uuid)
+ _applyOfferOperation(uuid)
.onAny(defer(self(), [=](const Future<Nothing>& future) {
if (!future.isReady()) {
LOG(ERROR)
- << "Failed to apply operation " << uuid << ": "
+ << "Failed to apply offer operation with UUID " << uuid << ": "
<< (future.isFailed() ? future.failure() : "future discarded");
}
}));
@@ -841,6 +851,27 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
}
+Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+{
+ CHECK(info.has_id());
+
+ const string resourceProviderDir = slave::paths::getResourceProviderPath(
+ metaDir, slaveId, info.type(), info.name(), info.id());
+
+ statusUpdateManager.initialize(
+ defer(self(), &Self::sendOfferOperationStatusUpdate, lambda::_1),
+ std::bind(
+ &slave::paths::getOfferOperationUpdatesPath,
+ resourceProviderDir,
+ lambda::_1));
+
+ statusUpdateManager.pause();
+
+ // TODO(chhsiao): Recover status updates.
+ return Nothing();
+}
+
+
void StorageLocalResourceProviderProcess::doReliableRegistration()
{
if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
@@ -872,7 +903,8 @@ void StorageLocalResourceProviderProcess::doReliableRegistration()
Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
{
- return importResources()
+ return recoverStatusUpdates()
+ .then(defer(self(), &Self::importResources))
.then(defer(self(), [=](Resources importedResources) {
// NODE: If a resource in the checkpointed total resources is
// missing in the imported resources, we will still keep it if it
@@ -945,6 +977,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
}
sendResourceProviderStateUpdate();
+ statusUpdateManager.resume();
state = READY;
@@ -1005,7 +1038,7 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
pendingOperations[uuid.get()] = operation;
checkpointResourceProviderState();
- applyOfferOperation(uuid.get())
+ _applyOfferOperation(uuid.get())
.onAny(defer(self(), [=](const Future<Nothing>& future) {
if (!future.isReady()) {
LOG(ERROR)
@@ -1942,8 +1975,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
if (volumes.contains(volumeInfo.id())) {
// The resource provider failed over after the last
- // `CreateVolume` call, but before the operation status
- // was checkpointed.
+ // `CreateVolume` call, but before the offer operation
+ // status was checkpointed.
CHECK_EQ(csi::state::VolumeState::CREATED,
volumes.at(volumeInfo.id()).state.state());
} else {
@@ -2027,7 +2060,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
}
} else {
// The resource provider failed over after the last `DeleteVolume`
- // call, but before the operation status was checkpointed.
+ // call, but before the offer operation status was checkpointed.
CHECK(!os::exists(volumePath));
}
@@ -2038,7 +2071,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume(
}
-Future<Nothing> StorageLocalResourceProviderProcess::applyOfferOperation(
+Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
const UUID& operationUuid)
{
Future<vector<ResourceConversion>> conversions;
@@ -2205,7 +2238,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
created = resource.disk().source().id();
} else {
// We use the operation UUID as the name of the volume, so the same
- // operation will create the same volume after recovery.
+ // offer operation will create the same volume after recovery.
// TODO(chhsiao): Call `CreateVolume` sequentially with other create
// or delete operations.
// TODO(chhsiao): Send `UPDATE_STATE` for RAW resources.
@@ -2389,8 +2422,8 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
auto err = [](const UUID& operationUuid, const string& message) {
LOG(ERROR)
- << "Failed to send status update for offer operation " << operationUuid
- << ": " << message;
+ << "Failed to send status update for offer operation with UUID "
+ << operationUuid << ": " << message;
};
driver->send(evolve(call))
@@ -2426,8 +2459,28 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
const string statePath = slave::paths::getResourceProviderStatePath(
metaDir, slaveId, info.type(), info.name(), info.id());
- CHECK_SOME(slave::state::checkpoint(statePath, state))
- << "Failed to checkpoint resource provider state to '" << statePath << "'";
+ Try<Nothing> checkpoint = slave::state::checkpoint(statePath, state);
+ CHECK_SOME(checkpoint)
+ << "Failed to checkpoint resource provider state to '" << statePath << "': "
+ << checkpoint.error();
+}
+
+
+void StorageLocalResourceProviderProcess::checkpointVolumeState(
+ const string& volumeId)
+{
+ const string statePath = csi::paths::getVolumeStatePath(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().plugin().type(),
+ info.storage().plugin().name(),
+ volumeId);
+
+ Try<Nothing> checkpoint =
+ slave::state::checkpoint(statePath, volumes.at(volumeId).state);
+
+ CHECK_SOME(checkpoint)
+ << "Failed to checkpoint volume state to '" << statePath << "':"
+ << checkpoint.error();
}
@@ -2443,8 +2496,8 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
const Event::ApplyOfferOperation& operation,
pendingOperations) {
// TODO(chhsiao): Maintain a list of terminated but unacknowledged
- // offer operations in memory and reconstruc that during recovery
- // by querying status update manager.
+ // offer operations in memory and reconstruct it during recovery
+ // by querying the status update manager.
update->add_operations()->CopyFrom(
protobuf::createOfferOperation(
operation.info(),
@@ -2475,17 +2528,38 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
}
-void StorageLocalResourceProviderProcess::checkpointVolumeState(
- const string& volumeId)
+void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate(
+ const OfferOperationStatusUpdate& statusUpdate)
{
- const string statePath = csi::paths::getVolumeStatePath(
- slave::paths::getCsiRootDir(workDir),
- info.storage().plugin().type(),
- info.storage().plugin().name(),
- volumeId);
+ Call call;
+ call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
- CHECK_SOME(slave::state::checkpoint(statePath, volumes.at(volumeId).state))
- << "Failed to checkpoint volume state to '" << statePath << "'";
+ Call::UpdateOfferOperationStatus* update =
+ call.mutable_update_offer_operation_status();
+ update->set_operation_uuid(statusUpdate.operation_uuid());
+ update->mutable_status()->CopyFrom(statusUpdate.status());
+
+ if (statusUpdate.has_framework_id()) {
+ update->mutable_framework_id()->CopyFrom(statusUpdate.framework_id());
+ }
+
+ // The latest status should have been set by the status update manager.
+ CHECK(statusUpdate.has_latest_status());
+ update->mutable_latest_status()->CopyFrom(statusUpdate.latest_status());
+
+ auto err = [](const UUID& uuid, const string& message) {
+ LOG(ERROR)
+ << "Failed to send status update for offer operation with UUID " << uuid
+ << ": " << message;
+ };
+
+ Try<UUID> uuid = UUID::fromBytes(statusUpdate.operation_uuid());
+ CHECK_SOME(uuid);
+
+ driver->send(evolve(call))
+ .onFailed(std::bind(err, uuid.get(), lambda::_1))
+ .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index b8004e7..f9f0c78 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -65,6 +65,7 @@ const char TASK_UPDATES_FILE[] = "task.updates";
const char RESOURCES_INFO_FILE[] = "resources.info";
const char RESOURCES_TARGET_FILE[] = "resources.target";
const char RESOURCE_PROVIDER_STATE_FILE[] = "resource_provider.state";
+const char OFFER_OPERATION_UPDATES_FILE[] = "operation.updates";
const char CONTAINERS_DIR[] = "containers";
@@ -75,6 +76,7 @@ const char EXECUTORS_DIR[] = "executors";
const char EXECUTOR_RUNS_DIR[] = "runs";
const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
const char RESOURCE_PROVIDERS_DIR[] = "resource_providers";
+const char OFFER_OPERATIONS_DIR[] = "operations";
Try<ExecutorRunPath> parseExecutorRunPath(
@@ -545,6 +547,58 @@ string getLatestResourceProviderPath(
}
+Try<list<string>> getOfferOperationPaths(
+ const string& rootDir)
+{
+ return fs::list(path::join(rootDir, OFFER_OPERATIONS_DIR, "*"));
+}
+
+
+string getOfferOperationPath(
+ const string& rootDir,
+ const UUID& operationUuid)
+{
+ return path::join(rootDir, OFFER_OPERATIONS_DIR, operationUuid.toString());
+}
+
+
+Try<UUID> parseOfferOperationPath(
+ const string& rootDir,
+ const string& dir)
+{
+ // TODO(chhsiao): Consider using `<regex>`, which requires GCC 4.9+.
+
+ // Make sure there's a separator at the end of the prefix so that we
+ // don't accidently slice off part of a directory.
+ const string prefix = path::join(rootDir, OFFER_OPERATIONS_DIR, "");
+
+ if (!strings::startsWith(dir, prefix)) {
+ return Error(
+ "Directory '" + dir + "' does not fall under operations directory '" +
+ prefix + "'");
+ }
+
+ Try<UUID> operationUuid = UUID::fromString(Path(dir).basename());
+ if (operationUuid.isError()) {
+ return Error(
+ "Could not decode offer operation UUID from string '" +
+ Path(dir).basename() + "': " + operationUuid.error());
+ }
+
+ return operationUuid.get();
+}
+
+
+string getOfferOperationUpdatesPath(
+ const string& rootDir,
+ const UUID& operationUuid)
+{
+ return path::join(
+ getOfferOperationPath(rootDir, operationUuid),
+ OFFER_OPERATION_UPDATES_FILE);
+}
+
+
string getResourcesInfoPath(
const string& rootDir)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/3100e9aa/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d645d87..bae68d0 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -23,6 +23,7 @@
#include <mesos/mesos.hpp>
#include <stout/try.hpp>
+#include <stout/uuid.hpp>
namespace mesos {
namespace internal {
@@ -79,6 +80,9 @@ namespace paths {
// | | |-- latest (symlink)
// | | |-- <resource_provider_id>
// | | |-- resource_provider.state
+// | | |-- operations
+// | | |-- <operation_uuid>
+// | | |-- operation.updates
// | |-- frameworks
// | |-- <framework_id>
// | |-- framework.info
@@ -349,6 +353,25 @@ std::string getLatestResourceProviderPath(
const std::string& resourceProviderName);
+Try<std::list<std::string>> getOfferOperationPaths(
+ const std::string& rootDir);
+
+
+std::string getOfferOperationPath(
+ const std::string& rootDir,
+ const UUID& operationUuid);
+
+
+Try<UUID> parseOfferOperationPath(
+ const std::string& rootDir,
+ const std::string& dir);
+
+
+std::string getOfferOperationUpdatesPath(
+ const std::string& rootDir,
+ const UUID& operationUuid);
+
+
std::string getResourcesInfoPath(
const std::string& rootDir);