You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/13 00:59:18 UTC
[1/7] mesos git commit: Added pause/resume methods to
`OfferOperationStatusUpdateManager`.
Repository: mesos
Updated Branches:
refs/heads/master c320ab3b2 -> d0d5f5c46
Added pause/resume methods to `OfferOperationStatusUpdateManager`.
Review: https://reviews.apache.org/r/64517/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/07630ccf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/07630ccf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/07630ccf
Branch: refs/heads/master
Commit: 07630ccfaefccb500d13f8940d9d3f397bdef32b
Parents: c320ab3
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:12 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:31 2017 -0800
----------------------------------------------------------------------
src/status_update_manager/offer_operation.cpp | 22 ++++++++++++++++++++++
src/status_update_manager/offer_operation.hpp | 10 ++++++++++
2 files changed, 32 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/07630ccf/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index f66690e..984969e 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -126,5 +126,27 @@ void OfferOperationStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
frameworkId);
}
+
+void OfferOperationStatusUpdateManager::pause()
+{
+ dispatch(
+ process.get(),
+ &StatusUpdateManagerProcess<
+ UUID,
+ OfferOperationStatusUpdateRecord,
+ OfferOperationStatusUpdate>::pause);
+}
+
+
+void OfferOperationStatusUpdateManager::resume()
+{
+ dispatch(
+ process.get(),
+ &StatusUpdateManagerProcess<
+ UUID,
+ OfferOperationStatusUpdateRecord,
+ OfferOperationStatusUpdate>::resume);
+}
+
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/07630ccf/src/status_update_manager/offer_operation.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.hpp b/src/status_update_manager/offer_operation.hpp
index 8751f0b..e7c2b0c 100644
--- a/src/status_update_manager/offer_operation.hpp
+++ b/src/status_update_manager/offer_operation.hpp
@@ -44,7 +44,11 @@ typedef StatusUpdateManagerProcess<
class OfferOperationStatusUpdateManager
{
public:
+ // NOTE: Unless first paused, the status update manager will forward updates
+ // as soon as possible; for example, during recovery or as soon as the first
+ // status update is processed.
OfferOperationStatusUpdateManager();
+
~OfferOperationStatusUpdateManager();
OfferOperationStatusUpdateManager(
@@ -107,6 +111,12 @@ public:
// responsible for garbage collection after this method has returned.
void cleanup(const FrameworkID& frameworkId);
+ // Stop forwarding status updates until `resume()` is called.
+ void pause();
+
+ // Resume forwarding status updates until `pause()` is called.
+ void resume();
+
private:
process::Owned<
StatusUpdateManagerProcess<
[7/7] mesos git commit: Made the `StatusUpdateManagerProcess` process
ID configurable.
Posted by gr...@apache.org.
Made the `StatusUpdateManagerProcess` process ID configurable.
Review: https://reviews.apache.org/r/64509/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d7da9b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d7da9b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d7da9b7
Branch: refs/heads/master
Commit: 9d7da9b77ad33530e0a436614479cafd08c13ec3
Parents: e70097c
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:19 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800
----------------------------------------------------------------------
src/status_update_manager/offer_operation.cpp | 4 +++-
src/status_update_manager/status_update_manager_process.hpp | 6 ++++--
2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d7da9b7/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index 8ffce61..986310e 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -38,7 +38,9 @@ OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
new StatusUpdateManagerProcess<
UUID,
OfferOperationStatusUpdateRecord,
- OfferOperationStatusUpdate>("offer operation status update"))
+ OfferOperationStatusUpdate>(
+ "offer-operation-status-update-manager",
+ "offer operation status update"))
{
spawn(process.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d7da9b7/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 935f754..8c9e06f 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -108,8 +108,10 @@ public:
State() : streams(), errors(0) {}
};
- StatusUpdateManagerProcess(const std::string& _statusUpdateType)
- : process::ProcessBase(process::ID::generate("status-update-manager")),
+ StatusUpdateManagerProcess(
+ const std::string& id,
+ const std::string& _statusUpdateType)
+ : process::ProcessBase(process::ID::generate(id)),
statusUpdateType(_statusUpdateType),
paused(false) {}
[2/7] mesos git commit: Improved the logging in
`StatusUpdateManagerProcess`.
Posted by gr...@apache.org.
Improved the logging in `StatusUpdateManagerProcess`.
This patch adds to the log and error messages the type of status update
handled by the instance of `StatusUpdateManagerProcess`.
Review: https://reviews.apache.org/r/64472/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3fa8d64e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3fa8d64e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3fa8d64e
Branch: refs/heads/master
Commit: 3fa8d64e84861bd6023cdab9296b47ab24f581b8
Parents: 07630cc
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:13 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:46 2017 -0800
----------------------------------------------------------------------
src/status_update_manager/offer_operation.cpp | 9 +-
.../status_update_manager_process.hpp | 149 +++++++++++--------
2 files changed, 90 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/offer_operation.cpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/offer_operation.cpp b/src/status_update_manager/offer_operation.cpp
index 984969e..8ffce61 100644
--- a/src/status_update_manager/offer_operation.cpp
+++ b/src/status_update_manager/offer_operation.cpp
@@ -34,10 +34,11 @@ namespace mesos {
namespace internal {
OfferOperationStatusUpdateManager::OfferOperationStatusUpdateManager()
- : process(new StatusUpdateManagerProcess<
- UUID,
- OfferOperationStatusUpdateRecord,
- OfferOperationStatusUpdate>())
+ : process(
+ new StatusUpdateManagerProcess<
+ UUID,
+ OfferOperationStatusUpdateRecord,
+ OfferOperationStatusUpdate>("offer operation status update"))
{
spawn(process.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3fa8d64e/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index 1ac6441..a44551f 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -108,8 +108,9 @@ public:
State() : streams(), errors(0) {}
};
- StatusUpdateManagerProcess()
+ StatusUpdateManagerProcess(const std::string& _statusUpdateType)
: process::ProcessBase(process::ID::generate("status-update-manager")),
+ statusUpdateType(_statusUpdateType),
paused(false) {}
StatusUpdateManagerProcess(const StatusUpdateManagerProcess& that) = delete;
@@ -144,7 +145,7 @@ public:
const IDType& streamId,
bool checkpoint)
{
- LOG(INFO) << "Received status update " << update;
+ LOG(INFO) << "Received " << statusUpdateType << " " << update;
if (!streams.contains(streamId)) {
Try<Nothing> create =
@@ -166,7 +167,7 @@ public:
// stream that is checkpointable, and vice-versa.
if (stream->checkpointed() != checkpoint) {
return process::Failure(
- "Mismatched checkpoint value for status update " +
+ "Mismatched checkpoint value for " + statusUpdateType + " " +
stringify(update) + " (expected checkpoint=" +
stringify(stream->checkpointed()) + " actual checkpoint=" +
stringify(checkpoint) + ")");
@@ -176,8 +177,8 @@ public:
// of the stream.
if (update.has_framework_id() != stream->frameworkId.isSome()) {
return process::Failure(
- "Mismatched framework ID for status update " + stringify(update) +
- " (expected " +
+ "Mismatched framework ID for " + statusUpdateType +
+ " " + stringify(update) + " (expected " +
(stream->frameworkId.isSome()
? stringify(stream->frameworkId.get())
: "no framework ID") +
@@ -191,7 +192,8 @@ public:
if (update.has_framework_id() &&
update.framework_id() != stream->frameworkId.get()) {
return process::Failure(
- "Mismatched framework ID for status update " + stringify(update) +
+ "Mismatched framework ID for " + statusUpdateType +
+ " " + stringify(update) +
" (expected " + stringify(stream->frameworkId.get()) +
" actual " + stringify(update.framework_id()) + ")");
}
@@ -238,14 +240,16 @@ public:
const IDType& streamId,
const UUID& uuid)
{
- LOG(INFO) << "Received status update acknowledgement (UUID: " << uuid << ")"
+ LOG(INFO) << "Received " << statusUpdateType
+ << " acknowledgement (UUID: " << uuid << ")"
<< " for stream " << stringify(streamId);
// This might happen if we haven't completed recovery yet or if the
// acknowledgement is for a stream that has been cleaned up.
if (!streams.contains(streamId)) {
return process::Failure(
- "Cannot find the status update stream " + stringify(streamId));
+ "Cannot find the " + statusUpdateType + " stream " +
+ stringify(streamId));
}
StatusUpdateStream* stream = streams[streamId].get();
@@ -258,7 +262,8 @@ public:
}
if (!result.get()) {
- return process::Failure("Duplicate status update acknowledgement");
+ return process::Failure(
+ "Duplicate " + statusUpdateType + " acknowledgement");
}
stream->timeout = None();
@@ -272,8 +277,8 @@ public:
bool terminated = stream->terminated;
if (terminated) {
if (next.isSome()) {
- LOG(WARNING) << "Acknowledged a terminal status update but updates are"
- << " still pending";
+ LOG(WARNING) << "Acknowledged a terminal " << statusUpdateType
+ << " but updates are still pending";
}
cleanupStatusUpdateStream(streamId);
} else if (!paused && next.isSome()) {
@@ -297,7 +302,7 @@ public:
const std::list<IDType>& streamIds,
bool strict)
{
- LOG(INFO) << "Recovering status update manager";
+ LOG(INFO) << "Recovering " << statusUpdateType << " manager";
State state;
foreach (const IDType& streamId, streamIds) {
@@ -306,7 +311,7 @@ public:
if (result.isError()) {
const std::string message =
- "Failed to recover status update stream " +
+ "Failed to recover " + statusUpdateType + " stream " +
stringify(streamId) + ": " + result.error();
LOG(WARNING) << message;
@@ -349,8 +354,8 @@ public:
// responsible for garbage collection after this method has returned.
void cleanup(const FrameworkID& frameworkId)
{
- LOG(INFO) << "Closing status update streams for framework"
- << " '" << frameworkId << "'";
+ LOG(INFO) << "Closing " << statusUpdateType << " streams of framework "
+ << frameworkId;
if (frameworkStreams.contains(frameworkId)) {
foreach (const IDType& streamId,
@@ -362,13 +367,13 @@ public:
void pause()
{
- LOG(INFO) << "Pausing sending status updates";
+ LOG(INFO) << "Pausing " << statusUpdateType << " manager";
paused = true;
}
void resume()
{
- LOG(INFO) << "Resuming sending status updates";
+ LOG(INFO) << "Resuming " << statusUpdateType << " manager";
paused = false;
foreachpair (const IDType& streamId,
@@ -379,7 +384,7 @@ public:
if (next.isSome()) {
const UpdateType& update = next.get();
- LOG(WARNING) << "Sending status update " << update;
+ LOG(INFO) << "Sending " << statusUpdateType << " " << update;
stream->timeout =
forward(streamId, update, slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
@@ -399,11 +404,12 @@ private:
const Option<FrameworkID>& frameworkId,
bool checkpoint)
{
- VLOG(1) << "Creating status update stream " << stringify(streamId)
- << " checkpoint=" << stringify(checkpoint);
+ VLOG(1) << "Creating " << statusUpdateType << " stream "
+ << stringify(streamId) << " checkpoint=" << stringify(checkpoint);
Try<process::Owned<StatusUpdateStream>> stream =
StatusUpdateStream::create(
+ statusUpdateType,
streamId,
frameworkId,
checkpoint ? Option<std::string>(getPath(streamId)) : None());
@@ -427,12 +433,14 @@ private:
const IDType& streamId,
bool strict)
{
- VLOG(1) << "Recovering status update stream " << stringify(streamId);
+ VLOG(1) << "Recovering " << statusUpdateType << " stream "
+ << stringify(streamId);
Result<std::pair<
process::Owned<StatusUpdateStream>,
typename StatusUpdateStream::State>> result =
- StatusUpdateStream::recover(streamId, getPath(streamId), strict);
+ StatusUpdateStream::recover(
+ statusUpdateType, streamId, getPath(streamId), strict);
if (result.isError()) {
return Error(result.error());
@@ -472,10 +480,11 @@ private:
void cleanupStatusUpdateStream(const IDType& streamId)
{
- VLOG(1) << "Cleaning up status update stream " << stringify(streamId);
+ VLOG(1) << "Cleaning up " << statusUpdateType << " stream "
+ << stringify(streamId);
- CHECK(streams.contains(streamId)) << "Cannot find the status update stream "
- << stringify(streamId);
+ CHECK(streams.contains(streamId)) << "Cannot find " << statusUpdateType
+ << " stream " << stringify(streamId);
StatusUpdateStream* stream = streams[streamId].get();
@@ -502,7 +511,7 @@ private:
{
CHECK(!paused);
- VLOG(1) << "Forwarding status update " << update;
+ VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
forwardCallback(update);
@@ -535,7 +544,7 @@ private:
if (stream->timeout->expired()) {
const UpdateType& update = stream->pending.front();
- LOG(WARNING) << "Resending status update " << update;
+ LOG(WARNING) << "Resending " << statusUpdateType << " " << update;
// Bounded exponential backoff.
Duration duration_ =
@@ -546,6 +555,10 @@ private:
}
}
+ // Type of status updates handled by the stream, e.g., "offer operation
+ // status update".
+ const std::string statusUpdateType;
+
lambda::function<void(UpdateType)> forwardCallback;
lambda::function<const std::string(const IDType&)> getPath;
@@ -576,13 +589,15 @@ private:
if (close.isError()) {
CHECK_SOME(path);
- LOG(WARNING) << "Failed to close file '" << path.get()
- << "': " << close.error();
+ LOG(WARNING) << "Failed to close " << statusUpdateType
+ << " stream file '" << path.get() << "': "
+ << close.error();
}
}
}
static Try<process::Owned<StatusUpdateStream>> create(
+ const std::string& statusUpdateType,
const IDType& streamId,
const Option<FrameworkID>& frameworkId,
const Option<std::string>& path)
@@ -591,8 +606,7 @@ private:
if (path.isSome()) {
if (os::exists(path.get())) {
- return Error(
- "The status updates file '" + path.get() + "' already exists.");
+ return Error("The file '" + path.get() + "' already exists");
}
// Create the base updates directory, if it doesn't exist.
@@ -611,15 +625,14 @@ private:
if (result.isError()) {
return Error(
- "Failed to open '" + path.get() +
- "' for status updates: " + result.error());
+ "Failed to open '" + path.get() + "' : " + result.error());
}
fd = result.get();
}
process::Owned<StatusUpdateStream> stream(
- new StatusUpdateStream(streamId, path, fd));
+ new StatusUpdateStream(statusUpdateType, streamId, path, fd));
stream->frameworkId = frameworkId;
@@ -627,8 +640,11 @@ private:
}
- static Result<std::pair<process::Owned<StatusUpdateStream>, State>>
- recover(const IDType& streamId, const std::string& path, bool strict)
+ static Result<std::pair<process::Owned<StatusUpdateStream>, State>> recover(
+ const std::string& statusUpdateType,
+ const IDType& streamId,
+ const std::string& path,
+ bool strict)
{
if (os::exists(Path(path).dirname()) && !os::exists(path)) {
// This could happen if the process died before it checkpointed any
@@ -640,15 +656,14 @@ private:
Try<int_fd> fd = os::open(path, O_SYNC | O_RDWR | O_CLOEXEC);
if (fd.isError()) {
- return Error(
- "Failed to open status updates stream file '" + path +
- "': " + fd.error());
+ return Error("Failed to open '" + path + "': " + fd.error());
}
process::Owned<StatusUpdateStream> stream(
- new StatusUpdateStream(streamId, path, fd.get()));
+ new StatusUpdateStream(statusUpdateType, streamId, path, fd.get()));
- VLOG(1) << "Replaying updates for stream " << stringify(streamId);
+ VLOG(1) << "Replaying " << statusUpdateType << " stream "
+ << stringify(streamId);
// Read the updates/acknowledgments, building both the stream's in-memory
// structures and the state object which will be returned.
@@ -674,7 +689,7 @@ private:
if (update.isNone()) {
return Error(
- "Unexpected status update acknowledgment"
+ "Unexpected " + statusUpdateType + " acknowledgment"
" (UUID: " + UUID::fromBytes(record->uuid())->toString() +
") for stream " + stringify(streamId));
}
@@ -696,23 +711,20 @@ private:
Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR);
if (currentPosition.isError()) {
return Error(
- "Failed to lseek status updates stream file '" + path +
- "': " + currentPosition.error());
+ "Failed to lseek file '" + path + "': " + currentPosition.error());
}
Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get());
if (truncated.isError()) {
return Error(
- "Failed to truncate status updates file '" + path +
- "': " + truncated.error());
+ "Failed to truncate file '" + path + "': " + truncated.error());
}
// After reading a non-corrupted updates file, `record` should be `none`.
if (record.isError()) {
std::string message =
- "Failed to read status updates file '" + path +
- "': " + record.error();
+ "Failed to read file '" + path + "': " + record.error();
if (strict) {
return Error(message);
@@ -732,8 +744,7 @@ private:
if (removed.isError()) {
return Error(
- "Failed to remove status updates file '" + path +
- "': " + removed.error());
+ "Failed to remove file '" + path + "': " + removed.error());
}
return None();
@@ -766,14 +777,15 @@ private:
// Check that this status update has not already been acknowledged.
if (acknowledged.contains(statusUuid.get())) {
- LOG(WARNING) << "Ignoring status update " << update
+ LOG(WARNING) << "Ignoring " << statusUpdateType << " " << update
<< " that has already been acknowledged";
return false;
}
// Check that this update has not already been received.
if (received.contains(statusUuid.get())) {
- LOG(WARNING) << "Ignoring duplicate status update " << update;
+ LOG(WARNING) << "Ignoring duplicate " << statusUpdateType << " "
+ << update;
return false;
}
@@ -807,15 +819,15 @@ private:
// acknowledgments for both the original and the retried update.
if (_update.isNone()) {
return Error(
- "Unexpected status update acknowledgment (UUID: " +
- statusUuid.toString() + ") for stream " + stringify(streamId));
+ "Unexpected acknowledgment (UUID: " + statusUuid.toString() +
+ ") for " + statusUpdateType + " stream " + stringify(streamId));
}
const UpdateType& update = _update.get();
if (acknowledged.contains(statusUuid)) {
- LOG(WARNING) << "Duplicate status update acknowledgment"
- << " for update " << update;
+ LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType
+ << " " << update;
return false;
}
@@ -830,9 +842,10 @@ private:
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (statusUuid != updateStatusUuid.get()) {
- LOG(WARNING) << "Unexpected status update acknowledgement"
- << " (received " << statusUuid << ", expecting "
- << updateStatusUuid.get() << ") for update " << update;
+ LOG(WARNING) << "Unexpected " << statusUpdateType
+ << " acknowledgment (received " << statusUuid
+ << ", expecting " << updateStatusUuid.get() << ") for "
+ << update;
return false;
}
@@ -869,10 +882,15 @@ private:
private:
StatusUpdateStream(
+ const std::string& _statusUpdateType,
const IDType& _streamId,
const Option<std::string>& _path,
Option<int_fd> _fd)
- : terminated(false), streamId(_streamId), path(_path), fd(_fd) {}
+ : terminated(false),
+ statusUpdateType(_statusUpdateType),
+ streamId(_streamId),
+ path(_path),
+ fd(_fd) {}
// Handles the status update and writes it to disk, if necessary.
//
@@ -890,8 +908,8 @@ private:
// Checkpoint the update if necessary.
if (checkpointed()) {
- LOG(INFO) << "Checkpointing " << type << " for status update "
- << update;
+ LOG(INFO) << "Checkpointing " << type << " for " << statusUpdateType
+ << " " << update;
CHECK_SOME(fd);
@@ -914,8 +932,7 @@ private:
Try<Nothing> write = ::protobuf::write(fd.get(), record);
if (write.isError()) {
error =
- "Failed to write acknowledgement for status update " +
- stringify(update) + " to '" + path.get() + "': " + write.error();
+ "Failed to write to file '" + path.get() + "': " + write.error();
return Error(error.get());
}
}
@@ -965,6 +982,10 @@ private:
}
}
+ // Type of status updates handled by the stream, e.g., "offer operation
+ // status update".
+ const std::string& statusUpdateType;
+
const IDType streamId;
const Option<std::string> path; // File path of the update stream.
[4/7] mesos git commit: Made the agent respond to reconciliation
requests for default resources.
Posted by gr...@apache.org.
Made the agent respond to reconciliation requests for default resources.
Review: https://reviews.apache.org/r/64548/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d0d5f5c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d0d5f5c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d0d5f5c4
Branch: refs/heads/master
Commit: d0d5f5c46de7dabb08d54b7c7f8c7f31e184de52
Parents: 01832c0
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:19:19 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 29 ++++++++++++++++++++++++++---
1 file changed, 26 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d0d5f5c4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 9584326..d997b42 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3898,14 +3898,37 @@ void Slave::reconcileOfferOperations(
{
bool containsResourceProviderOperations = false;
- // TODO(greggomann): Implement reconciliation for offer
- // operations on the agent's default resources.
foreach (
const ReconcileOfferOperationsMessage::Operation& operation,
message.operations()) {
if (operation.has_resource_provider_id()) {
containsResourceProviderOperations = true;
- break;
+ continue;
+ }
+
+ Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+ CHECK_SOME(operationUuid);
+
+ // The master reconciles when it notices that an operation is missing from
+ // an `UpdateSlaveMessage`. If we cannot find an operation in the agent
+ // state, we send an update to inform the master. If we do find the
+ // operation, then the master and agent state are consistent and we do not
+ // need to do anything.
+ OfferOperation* storedOperation = getOfferOperation(operationUuid.get());
+ if (storedOperation == nullptr) {
+ // For agent default resources, we send best-effort offer operation status
+ // updates to the master. This is satisfactory because a dropped message
+ // would imply a subsequent agent reregistration, after which an
+ // `UpdateSlaveMessage` would be sent with pending operations.
+ OfferOperationStatusUpdate update =
+ protobuf::createOfferOperationStatusUpdate(
+ operationUuid.get(),
+ protobuf::createOfferOperationStatus(OFFER_OPERATION_DROPPED),
+ None(),
+ None(),
+ info.id());
+
+ send(master.get(), update);
}
}
[6/7] mesos git commit: Avoided returning prematurely in an agent
handler.
Posted by gr...@apache.org.
Avoided returning prematurely in an agent handler.
Review: https://reviews.apache.org/r/64504/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/01832c02
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/01832c02
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/01832c02
Branch: refs/heads/master
Commit: 01832c0212c46d769d2023c61f2afd22fd21daa2
Parents: 5c91546
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:19:00 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/01832c02/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5869e73..9584326 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7225,7 +7225,7 @@ void Slave::handleResourceProviderMessage(
<< update.status().operation_id() << "' (uuid: "
<< operationUUID->toString() << ") for framework "
<< update.framework_id();
- return;
+ break;
}
updateOfferOperation(operation, update);
[3/7] mesos git commit: Made master reconcile known offer operations
with agent.
Posted by gr...@apache.org.
Made master reconcile known offer operations with agent.
In cases where the agent fails over or where an `UpdateSlaveMessage`
races with an `ApplyOfferOperationMessage`, it's possible that the
master knows about an offer operation which is not contained in an
`UpdateSlaveMessage`. In such cases, the master should send a
`ReconcileOfferOperations` message to the agent. The agent will
then respond by sending OFFER_OPERATION_DROPPED status updates for
any operations which it does not know about.
Review: https://reviews.apache.org/r/64464/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c91546b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c91546b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c91546b
Branch: refs/heads/master
Commit: 5c91546babf59a42c4e3fc98d5c712e8d1ddd3d3
Parents: 9d7da9b
Author: Greg Mann <gr...@mesosphere.io>
Authored: Tue Dec 12 16:18:41 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800
----------------------------------------------------------------------
src/master/master.cpp | 50 ++++++++++++++++++++++++++++------------------
1 file changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c91546b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index efe8b8f..806fbc2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7575,6 +7575,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
}
}
+ ReconcileOfferOperationsMessage reconcile;
+
// Update master and allocator state.
foreachpair (
const Option<ResourceProviderID>& providerId,
@@ -7637,29 +7639,35 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
} else {
// If this is a known resource provider or agent its total capacity cannot
// have changed, and it would not know about any non-terminal offer
- // operations not already known to the master. It might however have not
- // received an offer operations since the resource provider or agent fell
- // over before the message could be received. We need to remove these
- // operations from our state.
-
- // Reconcile offer operations. This includes recovering
- // resources in used by operations which did not reach the
- // agent or resource provider.
+ // operations not already known to the master. However, it might not have
+ // received an offer operation for a couple different reasons:
+ // - The resource provider or agent could have failed over before the
+ // operation's `ApplyOfferOperationMessage` could be received.
+ // - The operation's `ApplyOfferOperationMessage` could have raced with
+ // this `UpdateSlaveMessage`.
+ //
+ // In both of these cases, we need to reconcile such operations explicitly
+ // with the agent. For operations which the agent or resource provider
+ // does not recognize, an OFFER_OPERATION_DROPPED status update will be
+ // generated and the master will remove the operation from its state upon
+ // receipt of that update.
if (provider.oldOfferOperations.isSome()) {
foreachkey (const UUID& uuid, provider.oldOfferOperations.get()) {
if (provider.newOfferOperations.isNone() ||
!provider.newOfferOperations->contains(uuid)) {
- // TODO(bbannier): Instead of simply dropping an operation with
- // `removeOfferOperation` here we should instead send a `Reconcile`
- // message with a failed state to the agent so its status update
- // manager can reliably deliver the operation status to the
- // framework.
- LOG(WARNING) << "Dropping known offer operation " << uuid.toString()
- << " since it was not present in reconciliation "
- "message from agent";
-
- CHECK(slave->offerOperations.contains(uuid));
- removeOfferOperation(slave->offerOperations.at(uuid));
+ LOG(WARNING) << "Performing explicit reconciliation with agent for"
+ << " known offer operation " << uuid.toString()
+ << " since it was not present in original"
+ << " reconciliation message from agent";
+
+ ReconcileOfferOperationsMessage::Operation* reconcileOperation =
+ reconcile.add_operations();
+ reconcileOperation->set_operation_uuid(uuid.toBytes());
+
+ if (providerId.isSome()) {
+ reconcileOperation->mutable_resource_provider_id()
+ ->CopyFrom(providerId.get());
+ }
}
}
}
@@ -7679,6 +7687,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
}
}
+ if (reconcile.operations_size() > 0) {
+ send(slave->pid, reconcile);
+ }
+
// Now update the agent's state and total resources in the allocator.
allocator->updateSlave(slaveId, slave->info, slave->totalResources);
[5/7] mesos git commit: Fixed naming of a variable in
`status_update_manager_process.hpp`.
Posted by gr...@apache.org.
Fixed naming of a variable in `status_update_manager_process.hpp`.
Rename the variable `_update` to `update_` to make the name consistent
with the style guide.
Review: https://reviews.apache.org/r/64473/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e70097c5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e70097c5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e70097c5
Branch: refs/heads/master
Commit: e70097c55bf63c23eedf3151047c40a1b698240e
Parents: 3fa8d64
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Tue Dec 12 16:18:15 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Tue Dec 12 16:55:47 2017 -0800
----------------------------------------------------------------------
.../status_update_manager_process.hpp | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e70097c5/src/status_update_manager/status_update_manager_process.hpp
----------------------------------------------------------------------
diff --git a/src/status_update_manager/status_update_manager_process.hpp b/src/status_update_manager/status_update_manager_process.hpp
index a44551f..935f754 100644
--- a/src/status_update_manager/status_update_manager_process.hpp
+++ b/src/status_update_manager/status_update_manager_process.hpp
@@ -810,20 +810,20 @@ private:
}
// Get the corresponding update for this ACK.
- const Result<UpdateType>& _update = next();
- if (_update.isError()) {
- return Error(_update.error());
+ const Result<UpdateType>& update_ = next();
+ if (update_.isError()) {
+ return Error(update_.error());
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
- if (_update.isNone()) {
+ if (update_.isNone()) {
return Error(
"Unexpected acknowledgment (UUID: " + statusUuid.toString() +
") for " + statusUpdateType + " stream " + stringify(streamId));
}
- const UpdateType& update = _update.get();
+ const UpdateType& update = update_.get();
if (acknowledged.contains(statusUuid)) {
LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType