You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/10 18:06:53 UTC

[3/3] mesos git commit: Modified Slave to get container status from Containerizer.

Modified Slave to get container status from Containerizer.

Review: https://reviews.apache.org/r/43258/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38166b51
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38166b51
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38166b51

Branch: refs/heads/master
Commit: 38166b5117f39bdfe7f5122ce28bb0fab2d6b260
Parents: c869b06
Author: Avinash sridharan <av...@mesosphere.io>
Authored: Tue Feb 9 16:26:07 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 10 09:06:46 2016 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp                       | 96 +++++++++++++++++++-------
 src/slave/slave.hpp                       | 13 +++-
 src/tests/master_tests.cpp                |  9 +--
 src/tests/reconciliation_tests.cpp        |  9 +--
 src/tests/slave_tests.cpp                 |  9 +--
 src/tests/status_update_manager_tests.cpp | 27 ++++----
 6 files changed, 111 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 07f9371..f0be0d5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1162,7 +1162,7 @@ void Slave::reregistered(
         // updates for unknown frameworks.
         statusUpdateManager->update(update, info.id())
           .onAny(defer(self(),
-                       &Slave::__statusUpdate,
+                       &Slave::___statusUpdate,
                        lambda::_1,
                        update,
                        UPID()));
@@ -2990,7 +2990,7 @@ void Slave::reregisterExecutorTimeout()
 // This can be called in two ways:
 // 1) When a status update from the executor is received.
 // 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
-// NOTE: We set the pid in 'Slave::__statusUpdate()' to 'pid' so that
+// NOTE: We set the pid in 'Slave::___statusUpdate()' to 'pid' so that
 // whoever sent this update will get an ACK. This is important because
 // we allow executors to send updates for tasks that belong to other
 // executors. Currently we allow this because we cannot guarantee
@@ -3075,22 +3075,6 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
     }
   }
 
-  // Fill in the container IP address with the IP from the agent PID, if not
-  // already filled in.
-  // TODO(karya): Fill in the IP address by looking up the executor PID.
-  ContainerStatus* containerStatus =
-    update.mutable_status()->mutable_container_status();
-  if (containerStatus->network_infos().size() == 0) {
-    NetworkInfo* networkInfo = containerStatus->add_network_infos();
-
-    // TODO(CD): Deprecated -- Remove after 0.27.0.
-    networkInfo->set_ip_address(stringify(self().address.ip));
-
-    NetworkInfo::IPAddress* ipAddress =
-      networkInfo->add_ip_addresses();
-    ipAddress->set_ip_address(stringify(self().address.ip));
-  }
-
   const TaskStatus& status = update.status();
 
   Executor* executor = framework->getExecutor(status.task_id());
@@ -3111,8 +3095,14 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
     // re-registered. In this case, the slave cannot find the executor
     // corresponding to this task because the task has been moved to
     // 'Executor::completedTasks'.
+    //
+    // NOTE: We do not set the `ContainerStatus` (including the
+    // `NetworkInfo` within the `ContainerStatus)  for this case,
+    // because the container is unknown. We cannot use the slave IP
+    // address here (for the `NetworkInfo`) since we do not know the
+    // type of network isolation used for this container.
     statusUpdateManager->update(update, info.id())
-      .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
 
     return;
   }
@@ -3151,6 +3141,62 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   metrics.valid_status_updates++;
 
+  // Before sending update, we need to retrieve the container status.
+  containerizer->status(executor->containerId)
+    .onAny(defer(self(),
+                 &Slave::_statusUpdate,
+                 update,
+                 pid,
+                 executor->id,
+                 lambda::_1));
+}
+
+
+void Slave::_statusUpdate(
+    StatusUpdate update,
+    const Option<process::UPID>& pid,
+    const ExecutorID& executorId,
+    const Future<ContainerStatus>& future)
+{
+  ContainerStatus* containerStatus =
+    update.mutable_status()->mutable_container_status();
+
+  // There can be cases where a container is already removed from the
+  // containerizer before the `status` call is dispatched to the
+  // containerizer, leading to the failure of the returned `Future`.
+  // In such a case we should simply not update the `ContainerStatus`
+  // with the return `Future` but continue processing the
+  // `StatusUpdate`.
+  if (future.isReady()) {
+    containerStatus->MergeFrom(future.get());
+
+    // Fill in the container IP address with the IP from the agent
+    // PID, if not already filled in.
+    //
+    // TODO(karya): Fill in the IP address by looking up the executor PID.
+    if (containerStatus->network_infos().size() == 0) {
+      NetworkInfo* networkInfo = containerStatus->add_network_infos();
+
+      // TODO(CD): Deprecated -- Remove after 0.27.0.
+      networkInfo->set_ip_address(stringify(self().address.ip));
+
+      NetworkInfo::IPAddress* ipAddress =
+        networkInfo->add_ip_addresses();
+      ipAddress->set_ip_address(stringify(self().address.ip));
+    }
+  }
+
+
+  const TaskStatus& status = update.status();
+
+  Executor* executor = getExecutor(update.framework_id(), executorId);
+  if (executor == NULL) {
+    LOG(WARNING) << "Ignoring container status update for framework "
+                 << update.framework_id()
+                 << "for a non-existent executor";
+    return;
+  }
+
   // We set the latest state of the task here so that the slave can
   // inform the master about the latest state (via status update or
   // ReregisterSlaveMessage message) as soon as possible. Master can
@@ -3175,7 +3221,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
     // sending the status update.
     containerizer->update(executor->containerId, executor->resources)
       .onAny(defer(self(),
-                   &Slave::_statusUpdate,
+                   &Slave::__statusUpdate,
                    lambda::_1,
                    update,
                    pid,
@@ -3184,7 +3230,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
                    executor->checkpoint));
   } else {
     // Immediately send the status update.
-    _statusUpdate(None(),
+    __statusUpdate(None(),
                   update,
                   pid,
                   executor->id,
@@ -3194,7 +3240,7 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 }
 
 
-void Slave::_statusUpdate(
+void Slave::__statusUpdate(
     const Option<Future<Nothing>>& future,
     const StatusUpdate& update,
     const Option<UPID>& pid,
@@ -3229,16 +3275,16 @@ void Slave::_statusUpdate(
   if (checkpoint) {
     // Ask the status update manager to checkpoint and reliably send the update.
     statusUpdateManager->update(update, info.id(), executorId, containerId)
-      .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
   } else {
     // Ask the status update manager to just retry the update.
     statusUpdateManager->update(update, info.id())
-      .onAny(defer(self(), &Slave::__statusUpdate, lambda::_1, update, pid));
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
   }
 }
 
 
-void Slave::__statusUpdate(
+void Slave::___statusUpdate(
     const Future<Nothing>& future,
     const StatusUpdate& update,
     const Option<UPID>& pid)

http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a3830ff..ced835d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -215,9 +215,18 @@ public:
   // to ensure source field is set.
   void statusUpdate(StatusUpdate update, const Option<process::UPID>& pid);
 
+  // Called when the slave receives a `StatusUpdate` from an executor
+  // and the slave needs to retrieve the container status for the
+  // container associated with the executor.
+  void _statusUpdate(
+      StatusUpdate update,
+      const Option<process::UPID>& pid,
+      const ExecutorID& executorId,
+      const Future<ContainerStatus>& containerStatus);
+
   // Continue handling the status update after optionally updating the
   // container's resources.
-  void _statusUpdate(
+  void __statusUpdate(
       const Option<Future<Nothing>>& future,
       const StatusUpdate& update,
       const Option<process::UPID>& pid,
@@ -228,7 +237,7 @@ public:
   // This is called when the status update manager finishes
   // handling the update. If the handling is successful, an
   // acknowledgment is sent to the executor.
-  void __statusUpdate(
+  void ___statusUpdate(
       const process::Future<Nothing>& future,
       const StatusUpdate& update,
       const Option<process::UPID>& pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 0357b1c..393a6f5f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2776,7 +2776,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
   Future<StatusUpdateMessage> statusUpdateMessage =
     FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
 
-  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   driver.start();
 
@@ -2784,9 +2784,10 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
   AWAIT_READY(statusUpdateMessage);
 
   // Ensure status update manager handles TASK_RUNNING update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
-  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   // Now send TASK_FINISHED update.
   TaskStatus finishedStatus;
@@ -2795,7 +2796,7 @@ TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
   execDriver->sendStatusUpdate(finishedStatus);
 
   // Ensure status update manager handles TASK_FINISHED update.
-  AWAIT_READY(__statusUpdate2);
+  AWAIT_READY(___statusUpdate2);
 
   Future<Nothing> recoverResources = FUTURE_DISPATCH(
       _, &MesosAllocatorProcess::recoverResources);

http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 1cbc323..97112c4 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -815,7 +815,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
   Future<StatusUpdateMessage> statusUpdateMessage =
     DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
 
-  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   driver.start();
 
@@ -826,9 +826,10 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
   AWAIT_READY(statusUpdateMessage);
 
   // Ensure status update manager handles TASK_RUNNING update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
-  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   // Now send TASK_FINISHED update.
   TaskStatus finishedStatus;
@@ -837,7 +838,7 @@ TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState)
   execDriver->sendStatusUpdate(finishedStatus);
 
   // Ensure status update manager handles TASK_FINISHED update.
-  AWAIT_READY(__statusUpdate2);
+  AWAIT_READY(___statusUpdate2);
 
   EXPECT_CALL(sched, disconnected(&driver))
     .WillOnce(Return());

http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 0884ee5..c7f5a70 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2117,7 +2117,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
   Future<StatusUpdateMessage> statusUpdateMessage =
     DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
 
-  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   driver.start();
 
@@ -2128,9 +2128,10 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
   AWAIT_READY(statusUpdateMessage);
 
   // Ensure status update manager handles TASK_RUNNING update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
-  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   // Now send TASK_FINISHED update.
   TaskStatus finishedStatus;
@@ -2139,7 +2140,7 @@ TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState)
   execDriver->sendStatusUpdate(finishedStatus);
 
   // Ensure status update manager handles TASK_FINISHED update.
-  AWAIT_READY(__statusUpdate2);
+  AWAIT_READY(___statusUpdate2);
 
   Future<ReregisterSlaveMessage> reregisterSlaveMessage =
     FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);

http://git-wip-us.apache.org/repos/asf/mesos/blob/38166b51/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 7bedd49..d64d3b8 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -505,8 +505,8 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
   Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
     DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
 
-  Future<Nothing> __statusUpdate =
-    FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
 
   Clock::pause();
 
@@ -520,10 +520,10 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
 
   // At this point the status update manager has enqueued
   // TASK_FINISHED update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
-  Future<Nothing> __statusUpdate2 =
-    FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
 
   // Now send a TASK_KILLED update for the same task.
   TaskStatus status2 = status.get();
@@ -532,7 +532,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
 
   // At this point the status update manager has enqueued
   // TASK_FINISHED and TASK_KILLED updates.
-  AWAIT_READY(__statusUpdate2);
+  AWAIT_READY(___statusUpdate2);
 
   // After we advance the clock, the scheduler should receive
   // the retried TASK_FINISHED update and acknowledge it. The
@@ -725,15 +725,15 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
 
   AWAIT_READY(statusUpdateAckMessage);
 
-  Future<Nothing> __statusUpdate =
-    FUTURE_DISPATCH(slave.get(), &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::___statusUpdate);
 
   // Now resend the TASK_RUNNING update.
   process::post(slave.get(), statusUpdateMessage.get());
 
   // At this point the status update manager has handled
   // the duplicate status update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
   // After we advance the clock, the status update manager should
   // retry the TASK_RUNNING update and the scheduler should receive
@@ -794,7 +794,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
   Future<StatusUpdateMessage> statusUpdateMessage =
     DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
 
-  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate = FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   driver.start();
 
@@ -802,12 +802,13 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
   AWAIT_READY(statusUpdateMessage);
 
   // Ensure the status update manager handles the TASK_RUNNING update.
-  AWAIT_READY(__statusUpdate);
+  AWAIT_READY(___statusUpdate);
 
   // Pause the clock to avoid status update manager from retrying.
   Clock::pause();
 
-  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+  Future<Nothing> ___statusUpdate2 =
+    FUTURE_DISPATCH(_, &Slave::___statusUpdate);
 
   // Now send TASK_FINISHED update.
   TaskStatus finishedStatus;
@@ -816,7 +817,7 @@ TEST_F(StatusUpdateManagerTest, LatestTaskState)
   execDriver->sendStatusUpdate(finishedStatus);
 
   // Ensure the status update manager handles the TASK_FINISHED update.
-  AWAIT_READY(__statusUpdate2);
+  AWAIT_READY(___statusUpdate2);
 
   // Signal when the second update is dropped.
   Future<StatusUpdateMessage> statusUpdateMessage2 =