You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/10 18:06:53 UTC
[3/3] mesos git commit: Modified Slave to get container status from
Containerizer.
Modified Slave to get container status from Containerizer.
Review: https://reviews.apache.org/r/43258/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38166b51
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38166b51
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38166b51
Branch: refs/heads/master
Commit: 38166b5117f39bdfe7f5122ce28bb0fab2d6b260
Parents: c869b06
Author: Avinash sridharan <av...@mesosphere.io>
Authored: Tue Feb 9 16:26:07 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 10 09:06:46 2016 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 96 +++++++++++++++++++-------
src/slave/slave.hpp | 13 +++-
src/tests/master_tests.cpp | 9 +--
src/tests/reconciliation_tests.cpp | 9 +--
src/tests/slave_tests.cpp | 9 +--
src/tests/status_update_manager_tests.cpp | 27 ++++----
6 files changed, 111 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 07f9371..f0be0d5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1162,7 +1162,7 @@ void Slave::reregistered(
// updates for unknown frameworks.
statusUpdateManager->update(update, info.id())
.onAny(defer(self(),
- &Slave::__statusUpdate,
+ &Slave::___statusUpdate,
lambda::_1,
update,
UPID()));
@@ -2990,7 +2990,7 @@ void Slave::reregisterExecutorTimeout()
// This can be called in two ways:
// 1) When a status update from the executor is received.
// 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
-// NOTE: We set the pid in 'Slave::__statusUpdate()' to 'pid' so that
+// NOTE: We set the pid in 'Slave::___statusUpdate()' to 'pid' so that
// whoever sent this update will get an ACK. This is important because
// we allow executors to send updates for tasks that belong to other
// executors. Currently we allow this because we cannot guarantee
@@ -3075,22 +3075,6 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
}
}
- // Fill in the container IP address with the IP from the agent PID, if not
- // already filled in.
- // TODO(karya): Fill in the IP address by looking up the executor PID.
- ContainerStatus* containerStatus =
- update.mutable_status()->mutable_container_status();
- if (containerStatus->network_infos().size() == 0) {
- NetworkInfo* networkInfo = containerStatus->add_network_infos();
-
- // TODO(CD): Deprecated -- Remove after 0.27.0.
- networkInfo->set_ip_address(stringify(self().address.ip));
-
- NetworkInfo::IPAddress* ipAddress =
- networkInfo->add_ip_addresses();
- ipAddress->set_ip_address(stringify(self().address.ip));
- }
-
const TaskStatus& status = update.status();
Executor* executor = framework->getExecutor(status.task_id());
@@ -3111,8 +3095,14 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
// re-registered. In this case, the slave cannot find the executor
// corresponding to this task because the task has been moved to
// 'Executor::completedTasks'.
+ //
+ // NOTE: We do not set the `ContainerStatus` (including the
+ // `NetworkInfo` within the `ContainerStatus) for this case,
+ // because the container is unknown. We cannot use the slave IP
+ // address here (for the `NetworkInfo`) since we do not know the
+ // type of network isolation used for this container.
statusUpdateManager->update(update, info.id())
- .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+ .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
return;
}
@@ -3151,6 +3141,62 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
metrics.valid_status_updates++;
+ // Before sending update, we need to retrieve the container status.
+ containerizer->status(executor->containerId)
+ .onAny(defer(self(),
+ &Slave::_statusUpdate,
+ update,
+ pid,
+ executor->id,
+ lambda::_1));
+}
+
+
+void Slave::_statusUpdate(
+ StatusUpdate update,
+ const Option<process::UPID>& pid,
+ const ExecutorID& executorId,
+ const Future<ContainerStatus>& future)
+{
+ ContainerStatus* containerStatus =
+ update.mutable_status()->mutable_container_status();
+
+ // There can be cases where a container is already removed from the
+ // containerizer before the `status` call is dispatched to the
+ // containerizer, leading to the failure of the returned `Future`.
+ // In such a case we should simply not update the `ContainerStatus`
+ // with the return `Future` but continue processing the
+ // `StatusUpdate`.
+ if (future.isReady()) {
+ containerStatus->MergeFrom(future.get());
+
+ // Fill in the container IP address with the IP from the agent
+ // PID, if not already filled in.
+ //
+ // TODO(karya): Fill in the IP address by looking up the executor PID.
+ if (containerStatus->network_infos().size() == 0) {
+ NetworkInfo* networkInfo = containerStatus->add_network_infos();
+
+ // TODO(CD): Deprecated -- Remove after 0.27.0.
+ networkInfo->set_ip_address(stringify(self().address.ip));
+
+ NetworkInfo::IPAddress* ipAddress =
+ networkInfo->add_ip_addresses();
+ ipAddress->set_ip_address(stringify(self().address.ip));
+ }
+ }
+
+
+ const TaskStatus& status = update.status();
+
+ Executor* executor = getExecutor(update.framework_id(), executorId);
+ if (executor == NULL) {
+ LOG(WARNING) << "Ignoring container status update for framework "
+ << update.framework_id()
+ << "for a non-existent executor";
+ return;
+ }
+
// We set the latest state of the task here so that the slave can
// inform the master about the latest state (via status update or
// ReregisterSlaveMessage message) as soon as possible. Master can
@@ -3175,7 +3221,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
// sending the status update.
containerizer->update(executor->containerId, executor->resources)
.onAny(defer(self(),
- &Slave::_statusUpdate,
+ &Slave::__statusUpdate,
lambda::_1,
update,
pid,
@@ -3184,7 +3230,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
executor->checkpoint));
} else {
// Immediately send the status update.
- _statusUpdate(None(),
+ __statusUpdate(None(),
update,
pid,
executor->id,
@@ -3194,7 +3240,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
}
-void Slave::_statusUpdate(
+void Slave::__statusUpdate(
const Option<Future<Nothing>>& future,
const StatusUpdate& update,
const Option<UPID>& pid,
@@ -3229,16 +3275,16 @@ void Slave::_statusUpdate(
if (checkpoint) {
// Ask the status update manager to checkpoint and reliably send the update.
statusUpdateManager->update(update, info.id(), executorId, containerId)
- .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+ .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
} else {
// Ask the status update manager to just retry the update.
statusUpdateManager->update(update, info.id())
- .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+ .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
}
}
-void Slave::__statusUpdate(
+void Slave::___statusUpdate(
const Future<Nothing>& future,
const StatusUpdate& update,
const Option<UPID>& pid)
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a3830ff..ced835d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -215,9 +215,18 @@ public:
// to ensure source field is set.
void statusUpdate(StatusUpdate update, const Option<process::UPID>& pid);
+ // Called when the slave receives a `StatusUpdate` from an executor
+ // and the slave needs to retrieve the container status for the
+ // container associated with the executor.
+ void _statusUpdate(
+ StatusUpdate update,
+ const Option<process::UPID>& pid,
+ const ExecutorID& executorId,
+ const Future<ContainerStatus>& containerStatus);
+
// Continue handling the status update after optionally updating the
// container's resources.
- void _statusUpdate(
+ void __statusUpdate(
const Option<Future<Nothing>>& future,
const StatusUpdate& update,
const Option<process::UPID>& pid,
@@ -228,7 +237,7 @@ public:
// This is called when the status update manager finishes
// handling the update. If the handling is successful, an
// acknowledgment is sent to the executor.
- void __statusUpdate(
+ void ___statusUpdate(
const process::Future<Nothing>& future,
const StatusUpdate& update,
const Option<process::UPID>& pid);
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 0357b1c..393a6f5f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2776,7 +2776,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
Future<StatusUpdateMessage> statusUpdateMessage =
FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
- Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
driver.start();
@@ -2784,9 +2784,10 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
AWAIT_READY(statusUpdateMessage);
// Ensure status update manager handles TASK_RUNNING update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
- Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate2 =
+ FUTURE_DISPATCH(_, &Slave::___statusUpdate);
// Now send TASK_FINISHED update.
TaskStatus finishedStatus;
@@ -2795,7 +2796,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
execDriver->sendStatusUpdate(finishedStatus);
// Ensure status update manager handles TASK_FINISHED update.
- AWAIT_READY(__statusUpdate2);
+ AWAIT_READY(___statusUpdate2);
Future<Nothing> recoverResources = FUTURE_DISPATCH(
_, &MesosAllocatorProcess::recoverResources);
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 1cbc323..97112c4 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -815,7 +815,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
- Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
driver.start();
@@ -826,9 +826,10 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
AWAIT_READY(statusUpdateMessage);
// Ensure status update manager handles TASK_RUNNING update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
- Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate2 =
+ FUTURE_DISPATCH(_, &Slave::___statusUpdate);
// Now send TASK_FINISHED update.
TaskStatus finishedStatus;
@@ -837,7 +838,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
execDriver->sendStatusUpdate(finishedStatus);
// Ensure status update manager handles TASK_FINISHED update.
- AWAIT_READY(__statusUpdate2);
+ AWAIT_READY(___statusUpdate2);
EXPECT_CALL(sched, disconnected(&driver))
.WillOnce(Return());
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0884ee5..c7f5a70 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2117,7 +2117,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
- Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
driver.start();
@@ -2128,9 +2128,10 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
AWAIT_READY(statusUpdateMessage);
// Ensure status update manager handles TASK_RUNNING update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
- Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate2 =
+ FUTURE_DISPATCH(_, &Slave::___statusUpdate);
// Now send TASK_FINISHED update.
TaskStatus finishedStatus;
@@ -2139,7 +2140,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
execDriver->sendStatusUpdate(finishedStatus);
// Ensure status update manager handles TASK_FINISHED update.
- AWAIT_READY(__statusUpdate2);
+ AWAIT_READY(___statusUpdate2);
Future<ReregisterSlaveMessage> reregisterSlaveMessage =
FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 7bedd49..d64d3b8 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -505,8 +505,8 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
- Future<Nothing> __statusUpdate =
- FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate =
+ FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
Clock::pause();
@@ -520,10 +520,10 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
// At this point the status update manager has enqueued
// TASK_FINISHED update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
- Future<Nothing> __statusUpdate2 =
- FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate2 =
+ FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
// Now send a TASK_KILLED update for the same task.
TaskStatus status2 = status.get();
@@ -532,7 +532,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
// At this point the status update manager has enqueued
// TASK_FINISHED and TASK_KILLED updates.
- AWAIT_READY(__statusUpdate2);
+ AWAIT_READY(___statusUpdate2);
// After we advance the clock, the scheduler should receive
// the retried TASK_FINISHED update and acknowledge it. The
@@ -725,15 +725,15 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
AWAIT_READY(statusUpdateAckMessage);
- Future<Nothing> __statusUpdate =
- FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate =
+ FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
// Now resend the TASK_RUNNING update.
process::post(slave.get(), statusUpdateMessage.get());
// At this point the status update manager has handled
// the duplicate status update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
// After we advance the clock, the status update manager should
// retry the TASK_RUNNING update and the scheduler should receive
@@ -794,7 +794,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
Future<StatusUpdateMessage> statusUpdateMessage =
DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
- Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
driver.start();
@@ -802,12 +802,13 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
AWAIT_READY(statusUpdateMessage);
// Ensure the status update manager handles the TASK_RUNNING update.
- AWAIT_READY(__statusUpdate);
+ AWAIT_READY(___statusUpdate);
// Pause the clock to avoid status update manager from retrying.
Clock::pause();
- Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+ Future<Nothing> ___statusUpdate2 =
+ FUTURE_DISPATCH(_, &Slave::___statusUpdate);
// Now send TASK_FINISHED update.
TaskStatus finishedStatus;
@@ -816,7 +817,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
execDriver->sendStatusUpdate(finishedStatus);
// Ensure the status update manager handles the TASK_FINISHED update.
- AWAIT_READY(__statusUpdate2);
+ AWAIT_READY(___statusUpdate2);
// Signal when the second update is dropped.
Future<StatusUpdateMessage> statusUpdateMessage2 =