You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by me...@apache.org on 2016/01/07 19:46:41 UTC

mesos git commit: Report executor exit to framework schedulers.

Repository: mesos
Updated Branches:
  refs/heads/master 95406d642 -> 932df4885


Report executor exit to framework schedulers.

This is an MVP to start the work of notifying scheduler on scheduler
refresh. Next step would be sending this message reliabily, and/or
splitting Event::FAILURE for slave failure and executor termination.

Review: https://reviews.apache.org/r/40429/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/932df488
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/932df488
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/932df488

Branch: refs/heads/master
Commit: 932df4885c91083697a378ca8f09a47c194e5fbb
Parents: 95406d6
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu Jan 7 01:58:45 2016 -0800
Committer: Adam B <ad...@mesosphere.io>
Committed: Thu Jan 7 10:46:24 2016 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  1 +
 docs/app-framework-development-guide.md         |  2 +
 docs/upgrades.md                                |  3 +-
 include/mesos/scheduler.hpp                     |  1 +
 src/java/src/org/apache/mesos/Scheduler.java    |  2 +
 .../interface/src/mesos/interface/__init__.py   |  1 +
 src/sched/sched.cpp                             | 56 ++++++++++++++++++--
 src/tests/fault_tolerance_tests.cpp             | 16 +++---
 src/tests/gc_tests.cpp                          |  6 +++
 src/tests/master_slave_reconciliation_tests.cpp |  2 +
 src/tests/master_tests.cpp                      |  2 +
 src/tests/scheduler_event_call_tests.cpp        | 16 ++++--
 src/tests/slave_tests.cpp                       | 17 ++++++
 13 files changed, 109 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 2aa083c..912a402 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,7 @@ Release Notes - Mesos - Version 0.27.0 (WIP)
 --------------------------------------------
 
 ** API Changes:
+  * [MESOS-313]  - Report executor termination to framework schedulers.
   * [MESOS-2315] - Removed deprecated CommandInfo::ContainerInfo.
   * [MESOS-3988] - Implicit roles.
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/docs/app-framework-development-guide.md
----------------------------------------------------------------------
diff --git a/docs/app-framework-development-guide.md b/docs/app-framework-development-guide.md
index 4a43a93..b4c53d8 100644
--- a/docs/app-framework-development-guide.md
+++ b/docs/app-framework-development-guide.md
@@ -114,6 +114,8 @@ virtual void slaveLost(SchedulerDriver* driver,
  * Invoked when an executor has exited/terminated. Note that any
  * tasks running will have TASK_LOST status updates automagically
  * generated.
+ *
+ * NOTE: This callback is not reliably delivered.
  */
 virtual void executorLost(SchedulerDriver* driver,
                           const ExecutorID& executorId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/docs/upgrades.md
----------------------------------------------------------------------
diff --git a/docs/upgrades.md b/docs/upgrades.md
index aebdc1e..60cd08a 100644
--- a/docs/upgrades.md
+++ b/docs/upgrades.md
@@ -12,6 +12,8 @@ This document serves as a guide for users who wish to upgrade an existing Mesos
 
 * The Allocator API has changed due to the introduction of implicit roles. Custom allocator implementations will need to be updated. See [MESOS-4000](https://issues.apache.org/jira/browse/MESOS-4000) for more information.
 
+* The `executorLost` callback in the Scheduler interface will now be called whenever the slave detects termination of a custom executor. This callback was never called in previous versions, so please make sure any framework schedulers can now safely handle this callback. Note that this callback may not be reliably delivered.
+
 ## Upgrading from 0.25.x to 0.26.x ##
 
 **NOTE** The names of some TaskStatus::Reason enums have been changed. But the tag numbers remain unchanged, so it is backwards compatible. Frameworks using the new version might need to do some compile time adjustments:
@@ -27,7 +29,6 @@ On slaves, the affected `data` field was originally found via `executors[*].task
 
 **NOTE** The `NetworkInfo` protobuf has been changed. The fields `protocol` and `ip_address` are now deprecated. The new field `ip_addresses` subsumes the information provided by them.
 
-
 ## Upgrading from 0.24.x to 0.25.x
 
 **NOTE** The following endpoints will be deprecated in favor of new endpoints. Both versions will be available in 0.25 but the deprecated endpoints will be removed in a subsequent release.

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 049c041..8b506bb 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -149,6 +149,7 @@ public:
   // Invoked when an executor has exited/terminated. Note that any
   // tasks running will have TASK_LOST status updates automagically
   // generated.
+  // NOTE: This callback is not reliably delivered.
   virtual void executorLost(
       SchedulerDriver* driver,
       const ExecutorID& executorId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/java/src/org/apache/mesos/Scheduler.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/Scheduler.java b/src/java/src/org/apache/mesos/Scheduler.java
index 4f04883..fd1ad89 100644
--- a/src/java/src/org/apache/mesos/Scheduler.java
+++ b/src/java/src/org/apache/mesos/Scheduler.java
@@ -177,6 +177,8 @@ public interface Scheduler {
    * tasks running will have TASK_LOST status updates automagically
    * generated.
    *
+   * NOTE: This callback is not reliably delivered.
+   *
    * @param driver      The driver that was used to run this scheduler.
    * @param executorId  The ID of the executor that was lost.
    * @param slaveId     The ID of the slave that launched the executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/python/interface/src/mesos/interface/__init__.py
----------------------------------------------------------------------
diff --git a/src/python/interface/src/mesos/interface/__init__.py b/src/python/interface/src/mesos/interface/__init__.py
index 4be502f..ce00d54 100644
--- a/src/python/interface/src/mesos/interface/__init__.py
+++ b/src/python/interface/src/mesos/interface/__init__.py
@@ -118,6 +118,7 @@ class Scheduler(object):
     """
       Invoked when an executor has exited/terminated. Note that any tasks
       running will have TASK_LOST status updates automatically generated.
+      NOTE: This callback is not reliabily delivered.
     """
 
   def error(self, driver, message):

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 44eb4f5..38940b7 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -204,6 +204,12 @@ protected:
         &SchedulerProcess::lostSlave,
         &LostSlaveMessage::slave_id);
 
+    install<ExitedExecutorMessage>(
+        &SchedulerProcess::lostExecutor,
+        &ExitedExecutorMessage::executor_id,
+        &ExitedExecutorMessage::slave_id,
+        &ExitedExecutorMessage::status);
+
     install<ExecutorToFrameworkMessage>(
         &SchedulerProcess::frameworkMessage,
         &ExecutorToFrameworkMessage::slave_id,
@@ -583,10 +589,12 @@ protected:
 
         if (event.failure().has_slave_id() &&
             event.failure().has_executor_id()) {
-          // NOTE: We silently drop executor FAILURE messages
-          // because this matches the existing behavior of the
-          // scheduler driver: there is currently no install
-          // handler for ExitedExecutorMessage.
+          CHECK(event.failure().has_status());
+          lostExecutor(
+              from,
+              event.failure().executor_id(),
+              event.failure().slave_id(),
+              event.failure().status());
         } else if (event.failure().has_slave_id()) {
           lostSlave(from, event.failure().slave_id());
         } else {
@@ -992,6 +1000,46 @@ protected:
     VLOG(1) << "Scheduler::slaveLost took " << stopwatch.elapsed();
   }
 
+  void lostExecutor(
+      const UPID& from,
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      int32_t status)
+  {
+    if (!running.load()) {
+      VLOG(1)
+        << "Ignoring lost executor message because the driver is not running!";
+      return;
+    }
+
+    if (!connected) {
+      VLOG(1)
+        << "Ignoring lost executor message because the driver is disconnected!";
+      return;
+    }
+
+    CHECK_SOME(master);
+    if (from != master.get().pid()) {
+      VLOG(1) << "Ignoring lost executor message because it was sent "
+              << "from '" << from << "' instead of the leading master '"
+              << master.get().pid() << "'";
+      return;
+    }
+
+    VLOG(1)
+      << "Executor " << executorId << " on slave " << slaveId
+      << " exited with status " << status;
+
+    Stopwatch stopwatch;
+    if (FLAGS_v >= 1) {
+      stopwatch.start();
+    }
+
+    scheduler->executorLost(driver, executorId, slaveId, status);
+
+    VLOG(1) << "Scheduler::executorLost took " << stopwatch.elapsed();
+  }
+
   void frameworkMessage(
       const SlaveID& slaveId,
       const ExecutorID& executorId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index ba657d0..fd1c3c9 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -286,17 +286,16 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
   Future<Nothing> executorTerminated =
     FUTURE_DISPATCH(_, &Slave::executorTerminated);
 
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
   // Induce an ExitedExecutorMessage from the slave.
   containerizer.destroy(
       frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
 
   AWAIT_READY(executorTerminated);
 
-  // Slave should consider the framework completed after it executes
-  // "executorTerminated".
-  Clock::pause();
-  Clock::settle();
-  Clock::resume();
+  AWAIT_READY(executorLost);
 
   // Verify slave sees completed framework.
   slaveState = process::http::get(slave.get(), "state");
@@ -1712,14 +1711,15 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
     FUTURE_PROTOBUF(ExitedExecutorMessage(), slave.get(), master.get());
 
   // Now kill the executor.
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
   containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
 
   // Ensure the master correctly handles the exited executor
   // with no tasks!
   AWAIT_READY(executorExitedMessage);
-  Clock::pause();
-  Clock::settle();
-  Clock::resume();
+  AWAIT_READY(executorLost);
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index f939d27..ef5544b 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -547,6 +547,8 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .Times(AtMost(1)); // Ignore TASK_LOST from killed executor.
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
+
   // Kill the executor and inform the slave.
   containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
 
@@ -646,6 +648,8 @@ TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .Times(AtMost(1)); // Ignore TASK_LOST from killed executor.
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, slaveId, _));
+
   // Kill the executor and inform the slave.
   containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
 
@@ -790,6 +794,8 @@ TEST_F(GarbageCollectorIntegrationTest, Unschedule)
   EXPECT_CALL(exec2, launchTask(_, _))
     .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
 
+  EXPECT_CALL(sched, executorLost(&driver, exec1.id, _, _));
+
   Clock::pause();
 
   // Kill the first executor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 9afa826..d41178e 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -134,6 +134,8 @@ TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminatedExecutor)
   // Ensure the update was sent.
   AWAIT_READY(statusUpdateMessage);
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
+
   // Now kill the executor.
   containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 865fa4a..223b9d2 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -731,6 +731,8 @@ TEST_F(MasterTest, RecoverResources)
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
+
   // Now kill the executor, scheduler should get an offer it's resources.
   containerizer.destroy(offer.framework_id(), executorInfo.executor_id());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp
index 03f0332..bd8920f 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -584,18 +584,28 @@ TEST_F(SchedulerDriverEventTest, Failure)
   AWAIT_READY(frameworkRegisteredMessage);
   UPID frameworkPid = frameworkRegisteredMessage.get().to;
 
-  // Send a failure for an executor, this should be dropped
-  // to match the existing behavior of the scheduler driver.
+  // Send a failure for an executor, which should trigger executorLost callback.
   SlaveID slaveId;
   slaveId.set_value("S");
 
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+
+  const int32_t status = 255;
+
   Event event;
   event.set_type(Event::FAILURE);
   event.mutable_failure()->mutable_slave_id()->CopyFrom(slaveId);
-  event.mutable_failure()->mutable_executor_id()->set_value("E");
+  event.mutable_failure()->mutable_executor_id()->CopyFrom(executorId);
+  event.mutable_failure()->set_status(status);
+
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, executorId, slaveId, status))
+    .WillOnce(FutureSatisfy(&executorLost));
 
   process::post(master.get(), frameworkPid, event);
 
+  AWAIT_READY(executorLost);
+
   // Now, post a failure for a slave and expect a 'slaveLost'.
   event.mutable_failure()->clear_executor_id();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/932df488/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 328c853..6ea2934 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -293,6 +293,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
   Future<Nothing> schedule =
     FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
 
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
   // Now kill the executor.
   containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID);
 
@@ -1133,6 +1134,9 @@ TEST_F(SlaveTest, MetricsSlaveLaunchErrors)
 
   EXPECT_CALL(sched, statusUpdate(&driver, _));
 
+  // The above injected containerizer failure also triggers executorLost.
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
+
   // Try to start a task
   TaskInfo task = createTask(
       offer.slave_id(),
@@ -1396,6 +1400,10 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
   // stay in TERMINATING for a while.
   DROP_PROTOBUFS(ShutdownExecutorMessage(), slave.get(), _);
 
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
+
   // Send a ShutdownMessage instead of calling Stop() directly
   // to avoid blocking.
   post(master.get(), slave.get(), ShutdownMessage());
@@ -1405,6 +1413,8 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
   Clock::settle();
   Clock::resume();
 
+  AWAIT_READY(executorLost);
+
   // Clean up.
   driver.stop();
   driver.join();
@@ -1491,6 +1501,10 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
   EXPECT_CALL(exec, killTask(_, _))
     .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
 
+  Future<Nothing> executorLost;
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
+    .WillOnce(FutureSatisfy(&executorLost));
+
   // Kill one of the tasks. The failed update should result in the
   // second task going lost when the container is destroyed.
   driver.killTask(tasks[0].task_id());
@@ -1504,6 +1518,8 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
   EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source());
   EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason());
 
+  AWAIT_READY(executorLost);
+
   driver.stop();
   driver.join();
 
@@ -1607,6 +1623,7 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails)
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status));
+  EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
 
   driver.start();