You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/19 03:35:09 UTC

svn commit: r1469660 - in /incubator/mesos/trunk/src: slave/slave.cpp slave/slave.hpp tests/allocator_zookeeper_tests.cpp

Author: vinodkone
Date: Fri Apr 19 01:35:08 2013
New Revision: 1469660

URL: http://svn.apache.org/r1469660
Log:
Fixed slave to properly schedule executor directories
for garbage collection.

Review: https://reviews.apache.org/r/10604

Modified:
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Apr 19 01:35:08 2013
@@ -719,16 +719,35 @@ void Slave::runTask(
   LOG(INFO) << "Got assigned task " << task.task_id()
             << " for framework " << frameworkId;
 
-  Future<bool> unschedule = true;
+  // TODO(vinod): These ignored tasks should be consolidated by
+  // the master when the slave re-registers.
 
-  Executor* executor = NULL;
-  ExecutorID executorId = getExecutorInfo(task).executor_id();
+  if (!(task.slave_id() == info.id())) {
+    LOG(WARNING) << "Ignoring task " << task.task_id()
+                 << " because it was intended for the old slave " << info.id();
+    return;
+  }
+
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING || state == TERMINATING) {
+    LOG(WARNING) << "Ignoring task " << task.task_id()
+                 << " because the slave is " << state;
+    // TODO(vinod): Consider sending a TASK_LOST here.
+    // Currently it is tricky because 'statsuUpdate()'
+    // ignores updates for unknown frameworks.
+    return;
+  }
+
+  Future<bool> unschedule = true;
 
-  Framework* framework = getFramework(frameworkId);
   // If we are about to create a new framework, unschedule the work
   // and meta directories from getting gc'ed.
+  Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-    // Framework work directory.
+    // Unschedule framework work directory.
     string path = paths::getFrameworkPath(
         flags.work_dir, info.id(), frameworkId);
 
@@ -736,24 +755,30 @@ void Slave::runTask(
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
 
-    // Framework meta directory.
+    // Unschedule framework meta directory.
     path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
     if (os::exists(path)) {
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
-  } else {
-    // We add the corresponding executor to 'pending' to ensure the
-    // framework and top level executor directories are not scheduled
-    // for deletion before '_runTask()' is called.
-    framework->pending.insert(executorId);
 
-    executor = framework->getExecutor(executorId);
+    framework = new Framework(this, frameworkId, frameworkInfo, pid);
+    frameworks[frameworkId] = framework;
   }
 
+  const ExecutorInfo& executorInfo = getExecutorInfo(task);
+  const ExecutorID& executorId = executorInfo.executor_id();
+
+  // We add the task to 'pending' to ensure the framework is not
+  // removed and the framework and top level executor directories
+  // are not scheduled for deletion before '_runTask()' is called.
+  CHECK_NOTNULL(framework);
+  framework->pending.put(executorId, task.task_id());
+
   // If we are about to create a new executor, unschedule the top
   // level work and meta directories from getting gc'ed.
+  Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
-    // Executor work directory.
+    // Unschedule executor work directory.
     string path = paths::getExecutorPath(
         flags.work_dir, info.id(), frameworkId, executorId);
 
@@ -761,7 +786,7 @@ void Slave::runTask(
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
 
-    // Executor meta directory.
+    // Unschedule executor meta directory.
     path = paths::getExecutorPath(
         metaDir, info.id(), frameworkId, executorId);
 
@@ -792,6 +817,29 @@ void Slave::_runTask(
   LOG(INFO) << "Launching task " << task.task_id()
             << " for framework " << frameworkId;
 
+  const ExecutorInfo& executorInfo = getExecutorInfo(task);
+  const ExecutorID& executorId = executorInfo.executor_id();
+
+  // Remove the pending task from framework.
+  Framework* framework = getFramework(frameworkId);
+  CHECK_NOTNULL(framework);
+
+  framework->pending.remove(executorId, task.task_id());
+
+  // We don't send a status update here because a terminating
+  // framework cannot send acknowledgements.
+  if (framework->state == Framework::TERMINATING) {
+    LOG(WARNING) << "Ignoring run task " << task.task_id()
+                 << " of framework " << frameworkId
+                 << " because the framework is terminating";
+
+    if (framework->executors.empty() && framework->pending.empty()) {
+      remove(framework);
+    }
+    return;
+  }
+
+
   if (!future.isReady()) {
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
                << (future.isFailed() ? future.failure() : "future discarded");
@@ -804,34 +852,34 @@ void Slave::_runTask(
         "Could not launch the task because we failed to unschedule directories"
         " scheduled for gc");
 
+    // TODO(vinod): Ensure that the status update manager reliably
+    // delivers this update. Currently, we don't guarantee this
+    // because removal of the framework causes the status update
+    // manager to stop retrying for its un-acked updates.
     statusUpdate(update);
+
+    if (framework->executors.empty() && framework->pending.empty()) {
+      remove(framework);
+    }
+
     return;
   }
 
-  CHECK(state == RECOVERING || state == DISCONNECTED ||
-        state == RUNNING || state == TERMINATING)
+  // NOTE: The slave cannot be in 'RECOVERING' because the task would
+  // have been rejected in 'runTask()' in that case.
+  CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
     << state;
 
-  // This could happen if the runTask message was intended for the
-  // old slave, but the new (restarted) slave received it because
-  // it has the same pid.
-  if (state != RUNNING) {
-    LOG(WARNING) << "Cannot run task " << task.task_id()
+  if (state == TERMINATING) {
+    LOG(WARNING) << "Ignoring run task " << task.task_id()
                  << " of framework " << frameworkId
-                 << " because the slave is in " << state << " state";
-
-    const StatusUpdate& update = protobuf::createStatusUpdate(
-        frameworkId,
-        info.id(),
-        task.task_id(),
-        TASK_LOST,
-        "Slave is not in RUNNING state");
+                 << " because the slave is terminating";
 
-    statusUpdate(update);
+    // We don't send a TASK_LOST here because the slave is
+    // terminating.
     return;
   }
 
-  Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
     framework = new Framework(this, frameworkId, frameworkInfo, pid);
     frameworks[frameworkId] = framework;
@@ -839,56 +887,14 @@ void Slave::_runTask(
 
   CHECK_NOTNULL(framework);
 
-  CHECK(framework->state == Framework::RUNNING ||
-        framework->state == Framework::TERMINATING)
-    << framework->state;
-
-  // We don't send a status update here because a terminating
-  // framework cannot send acknowledgements.
-  if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring run task " << task.task_id()
-                 << " of framework " << frameworkId
-                 << " because the framework is terminating";
-    return;
-  }
-
-  const ExecutorInfo& executorInfo = getExecutorInfo(task);
-  const ExecutorID& executorId = executorInfo.executor_id();
+  CHECK(framework->state == Framework::RUNNING) << framework->state;
 
   // Either send the task to an executor or start a new executor
   // and queue the task until the executor has started.
   Executor* executor = framework->getExecutor(executorId);
 
   if (executor == NULL) {
-    // Launch an executor for this task.
-    executor = framework->createExecutor(executorInfo);
-
-    files->attach(executor->directory, executor->directory)
-      .onAny(defer(self(),
-                   &Self::fileAttached,
-                   params::_1,
-                   executor->directory));
-
-    // Tell the isolator to launch the executor.
-    dispatch(isolator,
-             &Isolator::launchExecutor,
-             info.id(),
-             framework->id,
-             framework->info,
-             executor->info,
-             executor->uuid,
-             executor->directory,
-             executor->resources);
-
-    // Make sure the executor registers within the given timeout.
-    // NOTE: We send this message before dispatching the launchExecutor to
-    // the isolator, to make writing tests easier.
-    delay(flags.executor_registration_timeout,
-          self(),
-          &Slave::registerExecutorTimeout,
-          framework->id,
-          executor->id,
-          executor->uuid);
+    executor = framework->launchExecutor(executorInfo);
   }
 
   CHECK_NOTNULL(executor);
@@ -974,19 +980,13 @@ void Slave::killTask(const FrameworkID& 
         state == RUNNING || state == TERMINATING)
     << state;
 
-  if (state != RUNNING) {
+  if (state == RECOVERING || state == TERMINATING) {
     LOG(WARNING) << "Cannot kill task " << taskId
                  << " of framework " << frameworkId
-                 << " because the slave is in " << state << " state";
-
-    const StatusUpdate& update = protobuf::createStatusUpdate(
-        frameworkId,
-        info.id(),
-        taskId,
-        TASK_LOST,
-        "Slave is not in RUNNING state");
-
-    statusUpdate(update);
+                 << " because the slave is " << state;
+    // TODO(vinod): Consider sending a TASK_LOST here.
+    // Currently it is tricky because 'statsuUpdate()'
+    // ignores updates for unknown frameworks.
     return;
   }
 
@@ -1126,6 +1126,9 @@ void Slave::shutdownFramework(const Fram
       framework->state = Framework::TERMINATING;
 
       // Shut down all executors of this framework.
+      // NOTE: If there are no executors but 'pending' tasks, the
+      // framework will be removed and all its tasks are appropriately
+      // handled in '_runTask()'.
       foreachvalue (Executor* executor, utils::copy(framework->executors)) {
         CHECK(executor->state == Executor::REGISTERING ||
               executor->state == Executor::RUNNING ||
@@ -1490,9 +1493,11 @@ void Slave::registerExecutor(
       message.mutable_slave_info()->MergeFrom(info);
       send(executor->pid, message);
 
-      LOG(INFO) << "Flushing queued tasks for framework " << framework->id;
-
       foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+        LOG(INFO) << "Flushing queued task " << task.task_id()
+                  << " for executor '" << executor->id << "'"
+                  << " of framework " << framework->id;
+
         stats.tasks[TASK_STAGING]++;
 
         RunTaskMessage message;
@@ -1952,25 +1957,27 @@ void Slave::executorStarted(
     << framework->state;
 
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Framework " << frameworkId
-                 << " for executor '" << executorId
-                 << "' is terminating";
+    LOG(WARNING) << "Killing executor '" << executorId
+                 << "' of framework '" << frameworkId
+                 << "' because the framework is terminating";
+    dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
     return;
   }
 
   Executor* executor = framework->getExecutor(executorId);
   if (executor == NULL) {
-    LOG(WARNING) << "Invalid executor '" << executorId
-                 << "' of framework " << frameworkId
-                 << " has started";
+    LOG(WARNING) << "Killing unknown executor '" << executorId
+                 << "' of framework " << frameworkId;
+    dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
     return;
   }
 
   switch (executor->state) {
     case Executor::TERMINATING:
-      LOG(WARNING) << "Executor '" << executorId
+      LOG(WARNING) << "Killing executor '" << executorId
                    << "' of framework " << frameworkId
-                   << " is terminating";
+                   << " because the executor is terminating";
+      dispatch(isolator, &Isolator::killExecutor, frameworkId, executorId);
       break;
     case Executor::REGISTERING:
     case Executor::RUNNING:
@@ -2157,15 +2164,16 @@ void Slave::remove(Framework* framework,
             << "' of framework " << framework->id;
 
   CHECK(framework->state == Framework::RUNNING ||
-        framework->state == Framework::TERMINATING);
+        framework->state == Framework::TERMINATING)
+    << framework->state;
 
   // Check that this executor has terminated and either has no
   // pending updates or the framework is terminating. We don't
   // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
-  CHECK(executor->state == Executor::TERMINATED &&
-        (executor->updates.empty() ||
-         framework->state == Framework::TERMINATING));
+  CHECK(executor->state == Executor::TERMINATED) << executor->state;
+  CHECK (executor->updates.empty() ||
+         framework->state == Framework::TERMINATING);
 
   // TODO(vinod): Move the responsibility of gc'ing to the
   // Executor struct.
@@ -2205,8 +2213,8 @@ void Slave::remove(Framework* framework,
 
   framework->destroyExecutor(executor->id);
 
-  // Remove this framework if it has no executors running.
-  if (framework->executors.empty()) {
+  // Remove this framework if it has no pending executors and tasks.
+  if (framework->executors.empty() && framework->pending.empty()) {
     remove(framework);
   }
 }
@@ -2221,29 +2229,30 @@ void Slave::remove(Framework* framework)
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING);
 
+  // The invariant here is that a framework should not be removed
+  // if it has either pending executors or pending tasks.
   CHECK(framework->executors.empty());
+  CHECK(framework->pending.empty());
 
   // Close all status update streams for this framework.
   statusUpdateManager->cleanup(framework->id);
 
-
   // Schedule the framework work and meta directories for garbage
-  // collection, only if it has no pending tasks.
+  // collection.
   // TODO(vinod): Move the responsibility of gc'ing to the
   // Framework struct.
-  if (framework->pending.empty()) {
-    const string& path = paths::getFrameworkPath(
-        flags.work_dir, info.id(), framework->id);
 
-    gc.schedule(flags.gc_delay, path);
+  const string& path = paths::getFrameworkPath(
+      flags.work_dir, info.id(), framework->id);
 
-    if (framework->info.checkpoint()) {
-      // Schedule the framework meta directory to get garbage collected.
-      const string& path = paths::getFrameworkPath(
-          metaDir, info.id(), framework->id);
+  gc.schedule(flags.gc_delay, path);
 
-      gc.schedule(flags.gc_delay, path);
-    }
+  if (framework->info.checkpoint()) {
+    // Schedule the framework meta directory to get garbage collected.
+    const string& path = paths::getFrameworkPath(
+        metaDir, info.id(), framework->id);
+
+    gc.schedule(flags.gc_delay, path);
   }
 
   frameworks.erase(framework->id);
@@ -2682,7 +2691,8 @@ Framework::~Framework()
 }
 
 
-Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
+// Create and launch an executor.
+Executor* Framework::launchExecutor(const ExecutorInfo& executorInfo)
 {
   // We create a UUID for the new executor. The UUID uniquely
   // identifies this new instance of the executor across executors
@@ -2704,6 +2714,34 @@ Executor* Framework::createExecutor(cons
 
   CHECK(!executors.contains(executorInfo.executor_id()));
   executors[executorInfo.executor_id()] = executor;
+
+  CHECK_NOTNULL(executor);
+
+  slave->files->attach(executor->directory, executor->directory)
+    .onAny(defer(slave,
+                 &Slave::fileAttached,
+                 params::_1,
+                 executor->directory));
+
+  // Tell the isolator to launch the executor.
+  dispatch(slave->isolator,
+           &Isolator::launchExecutor,
+           slave->info.id(),
+           id,
+           info,
+           executor->info,
+           executor->uuid,
+           executor->directory,
+           executor->resources);
+
+  // Make sure the executor registers within the given timeout.
+  delay(slave->flags.executor_registration_timeout,
+        slave,
+        &Slave::registerExecutorTimeout,
+        id,
+        executor->id,
+        executor->uuid);
+
   return executor;
 }
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Fri Apr 19 01:35:08 2013
@@ -390,7 +390,7 @@ struct Framework
 
   ~Framework();
 
-  Executor* createExecutor(const ExecutorInfo& executorInfo);
+  Executor* launchExecutor(const ExecutorInfo& executorInfo);
   void destroyExecutor(const ExecutorID& executorId);
   Executor* getExecutor(const ExecutorID& executorId);
   Executor* getExecutor(const TaskID& taskId);
@@ -411,7 +411,7 @@ struct Framework
 
   UPID pid;
 
-  hashset<ExecutorID> pending; // Executors with pending tasks.
+  multihashmap<ExecutorID, TaskID> pending; // Executors with pending tasks.
 
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;

Modified: incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp?rev=1469660&r1=1469659&r2=1469660&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/allocator_zookeeper_tests.cpp Fri Apr 19 01:35:08 2013
@@ -144,10 +144,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
   EXPECT_CALL(exec, shutdown(_))
     .WillRepeatedly(DoDefault());
 
-  Future<Nothing> resourcesChanged;
-  EXPECT_CALL(isolator, resourcesChanged(_, _, _))
-    .WillOnce(FutureSatisfy(&resourcesChanged));
-
   driver.start();
 
   AWAIT_READY(registered);
@@ -162,10 +158,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Frame
 
   EXPECT_EQ(TASK_RUNNING, statusUpdate.get().state());
 
-  // Ensures that the task has been fully launched before we kill the
-  // first master.
-  AWAIT_READY(resourcesChanged);
-
   // Stop the failing master from telling the slave to shut down when
   // it is killed.
   Future<process::Message> shutdownMsg =
@@ -289,10 +281,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
   EXPECT_CALL(exec, shutdown(_))
     .WillRepeatedly(DoDefault());
 
-  Future<Nothing> resourcesChanged;
-  EXPECT_CALL(isolator, resourcesChanged(_, _, _))
-    .WillOnce(FutureSatisfy(&resourcesChanged));
-
   driver.start();
 
   AWAIT_READY(registered);
@@ -307,10 +295,6 @@ TYPED_TEST(AllocatorZooKeeperTest, Slave
 
   EXPECT_EQ(TASK_RUNNING, statusUpdate.get().state());
 
-  // Ensures that the task has been fully launched before we kill the
-  // first master.
-  AWAIT_READY(resourcesChanged);
-
   // Stop the failing master from telling the slave to shut down when
   // it is killed.
   Future<process::Message> shutdownMsg =