You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:24 UTC

[01/18] git commit: Fixed slave to not re-register with master in cleanup mode.

Updated Branches:
  refs/heads/master 853c9bab4 -> ad2d66249


Fixed slave to not re-register with master in cleanup mode.

Review: https://reviews.apache.org/r/13442


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6da32671
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6da32671
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6da32671

Branch: refs/heads/master
Commit: 6da32671fe678a074dd3bab9c5b12850cabd6f7d
Parents: ab8d5d3
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 8 15:51:49 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:27 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                |  9 +++++++--
 src/tests/slave_recovery_tests.cpp | 14 ++++++++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6da32671/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index dbc4473..b835ac7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -438,8 +438,13 @@ void Slave::_initialize(const Future<Nothing>& future)
   // in 'cleanup' mode.
   if (frameworks.empty() && flags.recover == "cleanup") {
     terminate(self());
-  } else {
-    // Register with the master.
+  } else if (flags.recover == "reconnect") {
+    // Re-register if reconnecting.
+    // NOTE: Since the slave in cleanup mode never re-registers, if
+    // the master fails over it will not forward the updates from
+    // the "unknown" slave to the scheduler. This could lead to the
+    // slave waiting indefinitely for acknowledgements. The master's
+    // registrar could help in handling this correctly.
     state = DISCONNECTED;
     if (master) {
       doReliableRegistration();

http://git-wip-us.apache.org/repos/asf/mesos/blob/6da32671/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index bd755f6..a0734c3 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -791,8 +791,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   Future<vector<Offer> > offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers))
-    .WillRepeatedly(Return());      // Ignore subsequent offers.
+    .WillOnce(FutureArg<1>(&offers));
 
   driver.start();
 
@@ -815,10 +814,17 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   this->Stop(slave.get());
 
+  // Slave in cleanup mode shouldn't reregister with slave and hence
+  // no offers should be made by the master.
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .Times(0);
+
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(FutureArg<1>(&status));
 
+  Future<Nothing> _initialize = FUTURE_DISPATCH(_, &Slave::_initialize);
+
   // Restart the slave in 'cleanup' recovery mode with a new isolator.
   TypeParam isolator2;
 
@@ -839,6 +845,10 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
   AWAIT_READY(status);
   ASSERT_EQ(TASK_FAILED, status.get().state());
 
+  // Wait for recovery to complete.
+  AWAIT_READY(_initialize);
+  Clock::settle();
+
   Clock::resume();
 
   driver.stop();


[07/18] git commit: Changed slave state recovery to ignore absence of files as safe.

Posted by vi...@apache.org.
Changed slave state recovery to ignore absence of files as safe.

Review: https://reviews.apache.org/r/13443


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba2ee7cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba2ee7cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba2ee7cf

Branch: refs/heads/master
Commit: ba2ee7cf1cab0a1edef573fe360b58e670e1d548
Parents: 24991c9
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 8 19:41:01 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:57 2013 -0700

----------------------------------------------------------------------
 src/slave/state.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ba2ee7cf/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 77b29dc..cd74e41 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -70,6 +70,13 @@ Try<SlaveState> SlaveState::recover(
 
   // Read the slave info.
   const string& path = paths::getSlaveInfoPath(rootDir, slaveId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died before it registered
+    // with the master.
+    LOG(WARNING) << "Failed to find slave info file '" << path << "'";
+    return state;
+  }
+
   const Result<SlaveInfo>& slaveInfo = ::protobuf::read<SlaveInfo>(path);
 
   if (!slaveInfo.isSome()) {
@@ -126,6 +133,14 @@ Try<FrameworkState> FrameworkState::recover(
 
   // Read the framework info.
   string path = paths::getFrameworkInfoPath(rootDir, slaveId, frameworkId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died after creating the
+    // framework directory but before it checkpointed the
+    // framework info.
+    LOG(WARNING) << "Failed to find framework info file '" << path << "'";
+    return state;
+  }
+
   const Result<FrameworkInfo>& frameworkInfo =
     ::protobuf::read<FrameworkInfo>(path);
 
@@ -145,6 +160,13 @@ Try<FrameworkState> FrameworkState::recover(
 
   // Read the framework pid.
   path = paths::getFrameworkPidPath(rootDir, slaveId, frameworkId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died after creating the
+    // framework info but before it checkpointed the framework pid.
+    LOG(WARNING) << "Failed to framework pid file '" << path << "'";
+    return state;
+  }
+
   const Try<string>& pid = os::read(path);
 
   if (pid.isError()) {
@@ -205,6 +227,12 @@ Try<ExecutorState> ExecutorState::recover(
   // Read the executor info.
   const string& path =
     paths::getExecutorInfoPath(rootDir, slaveId, frameworkId, executorId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died after creating the executor
+    // directory but before it checkpointed the executor info.
+    LOG(WARNING) << "Failed to find executor info file '" << path << "'";
+    return state;
+  }
 
   const Result<ExecutorInfo>& executorInfo =
     ::protobuf::read<ExecutorInfo>(path);
@@ -330,6 +358,12 @@ Try<RunState> RunState::recover(
   // Read the forked pid.
   string path = paths::getForkedPidPath(
       rootDir, slaveId, frameworkId, executorId, uuid);
+  if (!os::exists(path)) {
+    // This could happen if the slave died before the isolator
+    // checkpointed the forked pid.
+    LOG(WARNING) << "Failed to find executor forked pid file '" << path << "'";
+    return state;
+  }
 
   Try<string> pid = os::read(path);
 
@@ -357,6 +391,14 @@ Try<RunState> RunState::recover(
   path = paths::getLibprocessPidPath(
       rootDir, slaveId, frameworkId, executorId, uuid);
 
+  if (!os::exists(path)) {
+    // This could happen if the slave died before the executor
+    // registered with the slave.
+    LOG(WARNING)
+      << "Failed to find executor libprocess pid file '" << path << "'";
+    return state;
+  }
+
   pid = os::read(path);
 
   if (pid.isError()) {
@@ -399,6 +441,12 @@ Try<TaskState> TaskState::recover(
   // Read the task info.
   string path = paths::getTaskInfoPath(
       rootDir, slaveId, frameworkId, executorId, uuid, taskId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died after creating the task
+    // directory but before it checkpointed the task info.
+    LOG(WARNING) << "Failed to find task info file '" << path << "'";
+    return state;
+  }
 
   const Result<Task>& task = ::protobuf::read<Task>(path);
 
@@ -419,6 +467,12 @@ Try<TaskState> TaskState::recover(
   // Read the status updates.
   path = paths::getTaskUpdatesPath(
       rootDir, slaveId, frameworkId, executorId, uuid, taskId);
+  if (!os::exists(path)) {
+    // This could happen if the slave died before it checkpointed
+    // any status updates for this task.
+    LOG(WARNING) << "Failed to find status updates file '" << path << "'";
+    return state;
+  }
 
   // Open the status updates file for reading and writing (for truncating).
   const Try<int>& fd = os::open(path, O_RDWR);


[18/18] git commit: Fixed status update manager to properly handle duplicate updates.

Posted by vi...@apache.org.
Fixed status update manager to properly handle duplicate updates.

Review: https://reviews.apache.org/r/13520


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad2d6624
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad2d6624
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad2d6624

Branch: refs/heads/master
Commit: ad2d662497afbe91ad62adb2f0b270e32bcad655
Parents: 3bc167a
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 17:38:08 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 11:03:02 2013 -0700

----------------------------------------------------------------------
 src/slave/status_update_manager.cpp       |  20 ++++-
 src/slave/status_update_manager.hpp       |  41 ++++++----
 src/tests/status_update_manager_tests.cpp | 106 +++++++++++++++++++++++++
 3 files changed, 149 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 6d4598e..b6afeb1 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -242,7 +242,7 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
           cleanupStatusUpdateStream(task.id, framework.id);
         } else {
           // If a stream has pending updates after the replay,
-          // send the first pending update
+          // send the first pending update.
           const Result<StatusUpdate>& next = stream->next();
           CHECK(!next.isError());
           if (next.isSome()) {
@@ -317,11 +317,17 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
   }
 
   // Handle the status update.
-  Try<Nothing> result = stream->update(update);
+  Try<bool> result = stream->update(update);
   if (result.isError()) {
     return Future<Nothing>::failed(result.error());
   }
 
+  // We don't return a failed future here so that the slave can re-ack
+  // the duplicate update.
+  if (!result.get()) {
+    return Nothing();
+  }
+
   // Forward the status update to the master if this is the first in the stream.
   // Subsequent status updates will get sent in 'acknowledgement()'.
   if (stream->pending.size() == 1) {
@@ -403,6 +409,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     return Future<bool>::failed(result.error());
   }
 
+  if (!result.get()) {
+    return Future<bool>::failed("Duplicate acknowledgement");
+  }
+
   // Reset the timeout.
   stream->timeout = None();
 
@@ -412,7 +422,9 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     return Future<bool>::failed(next.error());
   }
 
-  if (stream->terminated) {
+  bool terminated = stream->terminated;
+
+  if (terminated) {
     if (next.isSome()) {
       LOG(WARNING) << "Acknowledged a terminal"
                    << " status update " << update.get()
@@ -424,7 +436,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     stream->timeout = forward(next.get());
   }
 
-  return result.get();
+  return !terminated;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index ffc79ae..a032646 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -103,9 +103,9 @@ public:
 
   // Checkpoints the status update to disk if necessary.
   // Also, sends the next pending status update, if any.
-  // @return true if the ACK is handled successfully (e.g., checkpointed)
+  // @return True if the ACK is handled successfully (e.g., checkpointed)
   //              and the task's status update stream is not terminated.
-  //         false same as above except the status update stream is terminated.
+  //         False same as above except the status update stream is terminated.
   //         Failed if there are any errors (e.g., duplicate, checkpointing).
   process::Future<bool> acknowledgement(
       const TaskID& taskId,
@@ -200,7 +200,11 @@ struct StatusUpdateStream
     }
   }
 
-  Try<Nothing> update(const StatusUpdate& update)
+  // This function handles the update, checkpointing if necessary.
+  // @return   True if the update is successfully handled.
+  //           False if the update is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
+  Try<bool> update(const StatusUpdate& update)
   {
     if (error.isSome()) {
       return Error(error.get());
@@ -212,7 +216,7 @@ struct StatusUpdateStream
     if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
       LOG(WARNING) << "Ignoring status update " << update
                    << " that has already been acknowledged by the framework!";
-      return Nothing();
+      return false;
     }
 
     // Check that this update hasn't already been received.
@@ -220,13 +224,22 @@ struct StatusUpdateStream
     // then crashes after it writes it to disk but before it sends an ack.
     if (received.contains(UUID::fromBytes(update.uuid()))) {
       LOG(WARNING) << "Ignoring duplicate status update " << update;
-      return Nothing();
+      return false;
     }
 
     // Handle the update, checkpointing if necessary.
-    return handle(update, StatusUpdateRecord::UPDATE);
+    Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+    if (result.isError()) {
+      return Error(result.error());
+    }
+
+    return true;
   }
 
+  // This function handles the ACK, checkpointing if necessary.
+  // @return   True if the acknowledgement is successfully handled.
+  //           False if the acknowledgement is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
   Try<bool> acknowledgement(
       const TaskID& taskId,
       const FrameworkID& frameworkId,
@@ -238,18 +251,18 @@ struct StatusUpdateStream
     }
 
     if (acknowledged.contains(uuid)) {
-      return Error("Duplicate status update acknowledgment (UUID: "
-                   + uuid.toString() + ") for update " + stringify(update));
+      LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+                    << uuid << ") for update " << update;
+      return false;
     }
 
     // This might happen if we retried a status update and got back
     // acknowledgments for both the original and the retried update.
     if (uuid != UUID::fromBytes(update.uuid())) {
-      return Error(
-        "Unexpected status update acknowledgement (received " +
-        uuid.toString() +
-        ", expecting " + UUID::fromBytes(update.uuid()).toString() +
-        ") for update " + stringify(update));
+      LOG(WARNING) << "Unexpected status update acknowledgement (received "
+                   << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+                   << ") for update " << update;
+      return false;
     }
 
     // Handle the ACK, checkpointing if necessary.
@@ -258,7 +271,7 @@ struct StatusUpdateStream
       return Error(result.error());
     }
 
-    return !terminated;
+    return true;
   }
 
   // Returns the next update (or none, if empty) in the queue.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 6473478..cf420e4 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -660,3 +660,109 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate status updates, when the second
+// update with the same UUID is received before the ACK for the
+// first update. The proper behavior here is for the status update
+// manager to drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the first status update message.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  Future<Nothing> _statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+  // Now resend the TASK_RUNNING update.
+  process::post(slave.get(), statusUpdateMessage.get());
+
+  // At this point the status update manager has handled
+  // the duplicate status update.
+  AWAIT_READY(_statusUpdate);
+
+  // After we advance the clock, the status update manager should
+  // retry the TASK_RUNING update and the scheduler should receive
+  // and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[16/18] git commit: Fixed slave to not re-register with Command Executors.

Posted by vi...@apache.org.
Fixed slave to not re-register with Command Executors.

Review: https://reviews.apache.org/r/13487


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ca371ce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ca371ce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ca371ce

Branch: refs/heads/master
Commit: 7ca371ce4ca33f2a189037b57e091c6ad0a5aa48
Parents: 755308d
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 18:56:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 44 +++++++++++++++++++++-----------------------
 src/slave/slave.hpp |  2 ++
 2 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca371ce/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 11e4826..803da8d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -705,13 +705,18 @@ void Slave::doReliableRegistration()
           continue;
         }
 
-        // TODO(bmahler): Kill this in 0.15.0, as in 0.14.0 we've
-        // added code into the Scheduler Driver to ensure the
-        // framework id is set in ExecutorInfo, effectively making
-        // it a required field.
-        ExecutorInfo* executorInfo = message.add_executor_infos();
-        executorInfo->MergeFrom(executor->info);
-        executorInfo->mutable_framework_id()->MergeFrom(framework->id);
+        // Do not re-register with Command Executors because the
+        // master doesn't store them; they are generated by the slave.
+        if (!executor->commandExecutor) {
+          ExecutorInfo* executorInfo = message.add_executor_infos();
+          executorInfo->MergeFrom(executor->info);
+
+          // TODO(bmahler): Kill this in 0.15.0, as in 0.14.0 we've
+          // added code into the Scheduler Driver to ensure the
+          // framework id is set in ExecutorInfo, effectively making
+          // it a required field.
+          executorInfo->mutable_framework_id()->MergeFrom(framework->id);
+        }
 
         // Add launched tasks.
         // TODO(vinod): Use foreachvalue instead once LinkedHashmap
@@ -2135,8 +2140,6 @@ void Slave::executorTerminated(
       monitor.unwatch(frameworkId, executorId)
         .onAny(lambda::bind(_unwatch, lambda::_1, frameworkId, executorId));
 
-      Option<bool> isCommandExecutor;
-
       // Transition all live tasks to TASK_LOST/TASK_FAILED.
       // If the isolator destroyed the executor (e.g., due to OOM event)
       // or if this is a command executor, we send TASK_FAILED status updates
@@ -2155,8 +2158,7 @@ void Slave::executorTerminated(
         foreach (Task* task, executor->launchedTasks.values()) {
           if (!protobuf::isTerminalState(task->state())) {
             mesos::TaskState taskState;
-            isCommandExecutor = !task->has_executor_id();
-            if (destroyed || isCommandExecutor.get()) {
+            if (destroyed || executor->commandExecutor) {
               taskState = TASK_FAILED;
             } else {
               taskState = TASK_LOST;
@@ -2176,8 +2178,7 @@ void Slave::executorTerminated(
         // supports it.
         foreach (const TaskInfo& task, executor->queuedTasks.values()) {
           mesos::TaskState taskState;
-          isCommandExecutor = task.has_command();
-          if (destroyed || isCommandExecutor.get()) {
+          if (destroyed || executor->commandExecutor) {
             taskState = TASK_FAILED;
           } else {
             taskState = TASK_LOST;
@@ -2192,16 +2193,10 @@ void Slave::executorTerminated(
         }
       }
 
-      // If we weren't able to figure out whether this executor is a
-      // command executor above (e.g., no pending tasks), we deduce
-      // it from the ExecutorInfo. This is a hack for now.
-      if (isCommandExecutor.isNone()) {
-        isCommandExecutor = strings::contains(
-            executor->info.command().value(),
-            path::join(flags.launcher_dir, "mesos-executor"));
-      }
-
-      if (!isCommandExecutor.get()) {
+      // Only send ExitedExecutorMessage if it is not a Command
+      // Executor because the master doesn't store them; they are
+      // generated by the slave.
+      if (!executor->commandExecutor) {
         ExitedExecutorMessage message;
         message.mutable_slave_id()->MergeFrom(info.id());
         message.mutable_framework_id()->MergeFrom(frameworkId);
@@ -2998,6 +2993,9 @@ Executor::Executor(
     uuid(_uuid),
     directory(_directory),
     checkpoint(_checkpoint),
+    commandExecutor(strings::contains(
+        info.command().value(),
+        path::join(slave->flags.launcher_dir, "mesos-executor"))),
     pid(UPID()),
     resources(_info.resources()),
     completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)

http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca371ce/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 93fd32e..ef8b64f 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -375,6 +375,8 @@ struct Executor
 
   const bool checkpoint;
 
+  const bool commandExecutor;
+
   UPID pid;
 
   Resources resources; // Currently consumed resources.


[02/18] git commit: Added checkpointing ability for example frameworks.

Posted by vi...@apache.org.
Added checkpointing ability for example frameworks.

Review: https://reviews.apache.org/r/13398


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab8d5d3c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab8d5d3c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab8d5d3c

Branch: refs/heads/master
Commit: ab8d5d3c4a9d8504e8e3579692cc92d21ada7a46
Parents: 853c9ba
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 18:05:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:27 2013 -0700

----------------------------------------------------------------------
 src/examples/java/TestFramework.java  | 14 +++++++++++---
 src/examples/long_lived_framework.cpp |  7 +++++++
 src/examples/python/test_framework.py |  6 ++++++
 src/examples/test_framework.cpp       |  7 +++++++
 4 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index d130b08..687ed0d 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -145,10 +145,18 @@ public class TestFramework {
       .setSource("java_test")
       .build();
 
-    FrameworkInfo framework = FrameworkInfo.newBuilder()
+    FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
         .setUser("") // Have Mesos fill in the current user.
-        .setName("Test Framework (Java)")
-        .build();
+        .setName("Test Framework (Java)");
+
+    // TODO(vinod): Make checkpointing the default when it is default
+    // on the slave.
+    if (System.getenv("MESOS_CHECKPOINT") != null) {
+      System.out.println("Enabling checkpoint for the framework");
+      frameworkBuilder.setCheckpoint(true);
+    }
+
+    FrameworkInfo framework = frameworkBuilder.build();
 
     MesosSchedulerDriver driver = args.length == 1
       ? new MesosSchedulerDriver(

http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 9c86481..08ccc75 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -178,6 +178,13 @@ int main(int argc, char** argv)
   framework.set_user(""); // Have Mesos fill in the current user.
   framework.set_name("Long Lived Framework (C++)");
 
+  // TODO(vinod): Make checkpointing the default when it is default
+  // on the slave.
+  if (getenv("MESOS_CHECKPOINT")) {
+    cout << "Enabling checkpoint for the framework" << endl;
+    framework.set_checkpoint(true);
+  }
+
   MesosSchedulerDriver driver(&scheduler, framework, argv[1]);
 
   return driver.run() == DRIVER_STOPPED ? 0 : 1;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/python/test_framework.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index 39dcb05..eb20127 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -129,6 +129,12 @@ if __name__ == "__main__":
   framework.user = "" # Have Mesos fill in the current user.
   framework.name = "Test Framework (Python)"
 
+  # TODO(vinod): Make checkpointing the default when it is default
+  # on the slave.
+  if os.getenv("MESOS_CHECKPOINT"):
+    print "Enabling checkpoint for the framework";
+    framework.checkpoint = True
+
   driver = mesos.MesosSchedulerDriver(
     TestScheduler(executor),
     framework,

http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index f91d57a..0065de1 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -210,6 +210,13 @@ int main(int argc, char** argv)
   framework.set_name("Test Framework (C++)");
   framework.set_role(role);
 
+  // TODO(vinod): Make checkpointing the default when it is default
+  // on the slave.
+  if (getenv("MESOS_CHECKPOINT")) {
+    cout << "Enabling checkpoint for the framework" << endl;
+    framework.set_checkpoint(true);
+  }
+
   MesosSchedulerDriver driver(&scheduler, framework, master.get());
 
   return driver.run() == DRIVER_STOPPED ? 0 : 1;


[05/18] git commit: Fixed slave to offer total disk by default.

Posted by vi...@apache.org.
Fixed slave to offer total disk by default.

Review: https://reviews.apache.org/r/13395


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24991c9d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24991c9d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24991c9d

Branch: refs/heads/master
Commit: 24991c9d03b1b08ba25aa329026dbd7e7b40f38e
Parents: fbbb648
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:54:58 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/24991c9d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index aa0dbd7..5fa1fa7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -176,11 +176,11 @@ void Slave::initialize()
   if (!resources.disk().isSome()) {
     Bytes disk;
 
-    // NOTE: We calculate disk availability of the file system on
+    // NOTE: We calculate disk size of the file system on
     // which the slave work directory is mounted.
-    Try<Bytes> disk_ = fs::available(flags.work_dir);
+    Try<Bytes> disk_ = fs::size(flags.work_dir);
     if (!disk_.isSome()) {
-      LOG(WARNING) << "Failed to auto-detect the free disk space: '"
+      LOG(WARNING) << "Failed to auto-detect the disk space: '"
                    << disk_.error()
                    << "' ; defaulting to " << DEFAULT_DISK;
       disk = DEFAULT_DISK;


[03/18] git commit: Added fs::size() to calculate the total disk space and killed fs::available().

Posted by vi...@apache.org.
Added fs::size() to calculate the total disk space
and killed fs::available().

Review: https://reviews.apache.org/r/13394


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbbb648a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbbb648a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbbb648a

Branch: refs/heads/master
Commit: fbbb648a461c2b4f5ba8ff89328ec1c3053690f9
Parents: 9ce2edd
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:53:58 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fbbb648a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
index e405f96..3a20e86 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
@@ -16,14 +16,14 @@
 // a struct, and move this back into os.hpp.
 namespace fs {
 
-// Returns the total available disk size in bytes.
-inline Try<Bytes> available(const std::string& path = "/")
+// Returns the total disk size in bytes.
+inline Try<Bytes> size(const std::string& path = "/")
 {
   struct statvfs buf;
   if (::statvfs(path.c_str(), &buf) < 0) {
     return ErrnoError();
   }
-  return Bytes(buf.f_bavail * buf.f_frsize);
+  return Bytes(buf.f_blocks * buf.f_frsize);
 }
 
 


[11/18] git commit: Improved task validation error messages in master.

Posted by vi...@apache.org.
Improved task validation error messages in master.

Review: https://reviews.apache.org/r/13454


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/930aca13
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/930aca13
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/930aca13

Branch: refs/heads/master
Commit: 930aca1367afa0ad139300b1f8f8ee2e7d871f00
Parents: 376cd66
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 16:18:05 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:42:32 2013 -0700

----------------------------------------------------------------------
 src/common/type_utils.hpp           |  8 +++++++
 src/master/master.cpp               | 36 ++++++++++++++++++--------------
 src/slave/slave.cpp                 |  3 +++
 src/tests/resource_offers_tests.cpp | 10 +++++----
 4 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 9320ced..674a882 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -89,6 +89,14 @@ inline std::ostream& operator << (std::ostream& stream, const SlaveInfo& slave)
 }
 
 
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const ExecutorInfo& executor)
+{
+  return stream << executor.DebugString();
+}
+
+
 inline bool operator == (const FrameworkID& left, const FrameworkID& right)
 {
   return left.value() == right.value();

http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6530008..2b60e32 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1515,12 +1515,11 @@ struct ResourceUsageChecker : TaskInfoVisitor
     Resources taskResources = task.resources();
 
     if (!((usedResources + taskResources) <= offer->resources())) {
-      LOG(WARNING) << "Task " << task.task_id() << " attempted to use "
-                   << taskResources << " combined with already used "
-                   << usedResources << " is greater than offered "
-                   << offer->resources();
-
-      return TaskInfoError::some("Task uses more resources than offered");
+      return TaskInfoError::some(
+          "Task " + stringify(task.task_id()) + " attempted to use " +
+          stringify(taskResources) + " combined with already used " +
+          stringify(usedResources) + " is greater than offered " +
+          stringify(offer->resources()));
     }
 
     // Check this task's executor's resources.
@@ -1530,9 +1529,9 @@ struct ResourceUsageChecker : TaskInfoVisitor
       foreach (const Resource& resource, task.executor().resources()) {
         if (!Resources::isAllocatable(resource)) {
           // TODO(benh): Send back the invalid resources?
-          LOG(WARNING) << "Executor for task " << task.task_id()
-                       << " uses invalid resources " << resource;
-          return TaskInfoError::some("Task's executor uses invalid resources");
+          return TaskInfoError::some(
+              "Executor for task " + stringify(task.task_id()) +
+              " uses invalid resources " + stringify(resource));
         }
       }
 
@@ -1542,13 +1541,11 @@ struct ResourceUsageChecker : TaskInfoVisitor
         if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
           taskResources += task.executor().resources();
           if (!((usedResources + taskResources) <= offer->resources())) {
-            LOG(WARNING) << "Task " << task.task_id() << " + executor attempted"
-                         << " to use " << taskResources << " combined with"
-                         << " already used " << usedResources << " is greater"
-                         << " than offered " << offer->resources();
-
             return TaskInfoError::some(
-                "Task + executor uses more resources than offered");
+                "Task " + stringify(task.task_id()) + " + executor attempted" +
+                " to use " + stringify(taskResources) + " combined with" +
+                " already used " + stringify(usedResources) + " is greater" +
+                " than offered " + stringify(offer->resources()));
           }
         }
         executors.insert(task.executor().executor_id());
@@ -1588,7 +1585,14 @@ struct ExecutorInfoChecker : TaskInfoVisitor
         if (!(task.executor() == executorInfo)) {
           return TaskInfoError::some(
               "Task has invalid ExecutorInfo (existing ExecutorInfo"
-              " with same ExecutorID is not compatible)");
+              " with same ExecutorID is not compatible).\n"
+              "------------------------------------------------------------\n"
+              "Existing ExecutorInfo:\n" +
+              stringify(executorInfo) + "\n"
+              "------------------------------------------------------------\n"
+              "Task's ExecutorInfo:\n" +
+              stringify(task.executor()) + "\n"
+              "------------------------------------------------------------\n");
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e8176d2..83c250a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2593,8 +2593,11 @@ Future<Nothing> Slave::recover(bool reconnect, bool strict)
   if (reconnect && !(info == state.get().info.get())) {
     EXIT(1)
       << "Incompatible slave info detected.\n"
+      << "------------------------------------------------------------\n"
       << "Old slave info:\n" << state.get().info.get() << "\n"
+      << "------------------------------------------------------------\n"
       << "New slave info:\n" << info << "\n"
+      << "------------------------------------------------------------\n"
       << "To properly upgrade the slave do as follows:\n"
       << "Step 1: Start the slave with --recover=cleanup.\n"
       << "Step 2: Wait till the slave kills all executors and shuts down.\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index a96e775..3888e46 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -23,6 +23,8 @@
 #include <mesos/executor.hpp>
 #include <mesos/scheduler.hpp>
 
+#include <stout/strings.hpp>
+
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
@@ -252,7 +254,8 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
   EXPECT_TRUE(status.get().has_message());
-  EXPECT_EQ("Task uses more resources than offered", status.get().message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(), "greater than offered"));
 
   driver.stop();
   driver.join();
@@ -570,9 +573,8 @@ TEST_F(MultipleExecutorsTest, TasksExecutorInfoDiffers)
   EXPECT_EQ(task2.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
   EXPECT_TRUE(status.get().has_message());
-  EXPECT_EQ("Task has invalid ExecutorInfo (existing ExecutorInfo"
-            " with same ExecutorID is not compatible)",
-            status.get().message());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(), "Task has invalid ExecutorInfo"));
 
   EXPECT_CALL(exec, shutdown(_))
     .Times(AtMost(1));


[12/18] git commit: Implemented a LinkedHashMap that preserves the insertion order of keys of a hashmap.

Posted by vi...@apache.org.
Implemented a LinkedHashMap that preserves the insertion
order of keys of a hashmap.

Review: https://reviews.apache.org/r/13464


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2ebf17f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2ebf17f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2ebf17f

Branch: refs/heads/master
Commit: a2ebf17f030697696f109db4391a6d9dc9225232
Parents: 930aca1
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat Aug 10 01:13:55 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:49:29 2013 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/3rdparty/Makefile.am        |  1 +
 3rdparty/libprocess/3rdparty/stout/Makefile.am  |  2 +
 .../3rdparty/stout/include/stout/hashmap.hpp    |  2 +
 .../stout/include/stout/linkedhashmap.hpp       | 92 +++++++++++++++++++
 .../stout/tests/linkedhashmap_tests.cpp         | 93 ++++++++++++++++++++
 5 files changed, 190 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 0cd407c..e8561df 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -123,6 +123,7 @@ stout_tests_SOURCES =				\
   $(STOUT)/tests/gzip_tests.cpp			\
   $(STOUT)/tests/hashset_tests.cpp		\
   $(STOUT)/tests/json_tests.cpp			\
+  $(STOUT)/tests/linkedhashmap_tests.cpp	\
   $(STOUT)/tests/main.cpp			\
   $(STOUT)/tests/multimap_tests.cpp		\
   $(STOUT)/tests/none_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am b/3rdparty/libprocess/3rdparty/stout/Makefile.am
index e465fd1..0428aa8 100644
--- a/3rdparty/libprocess/3rdparty/stout/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am
@@ -26,6 +26,7 @@ EXTRA_DIST =					\
   include/stout/hashset.hpp			\
   include/stout/json.hpp			\
   include/stout/lambda.hpp			\
+  include/stout/linkedhashmap.hpp		\
   include/stout/multihashmap.hpp		\
   include/stout/multimap.hpp			\
   include/stout/net.hpp				\
@@ -66,6 +67,7 @@ EXTRA_DIST =					\
   tests/gzip_tests.cpp				\
   tests/hashset_tests.cpp			\
   tests/json_tests.cpp				\
+  tests/linkedhashmap_tests.cpp			\
   tests/main.cpp				\
   tests/multimap_tests.cpp			\
   tests/none_tests.cpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
index 796cb50..cea6988 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
@@ -53,6 +53,7 @@ public:
   }
 
   // Returns the set of keys in this map.
+  // TODO(vinod/bmahler): Should return a list instead.
   hashset<Key> keys() const
   {
     hashset<Key> result;
@@ -63,6 +64,7 @@ public:
   }
 
   // Returns the set of values in this map.
+  // TODO(vinod/bmahler): Should return a list instead.
   hashset<Value> values() const
   {
     hashset<Value> result;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
new file mode 100644
index 0000000..a27ec26
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -0,0 +1,92 @@
+#ifndef __STOUT_LINKEDHASHMAP_HPP__
+#define __STOUT_LINKEDHASHMAP_HPP__
+
+#include <list>
+#include <utility>
+
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+
+// Implementation of a hashmap that maintains the insertion order
+// of the keys. Note that re-insertion of a key (i.e., update)
+// doesn't update its insertion order.
+// TODO(vinod/bmahler): Consider extending from stout::hashmap and/or
+// having a compatible API with stout::hashmap.
+template <typename Key, typename Value>
+class LinkedHashMap
+{
+public:
+  typedef std::list<Key> list;
+  typedef hashmap<Key, std::pair<Value, typename list::iterator> > map;
+
+  Value& operator[] (const Key& key)
+  {
+    if (!values_.contains(key)) {
+      // Insert into the list and get the "pointer" into the list.
+      typename list::iterator i = keys_.insert(keys_.end(), key);
+      values_[key] = std::make_pair(Value(), i); // Store default value.
+    }
+    return values_[key].first;
+  }
+
+  Option<Value> get(const Key& key) const
+  {
+    if (values_.contains(key)) {
+      return values_.at(key).first;
+    }
+    return None();
+  }
+
+  bool contains(const Key& key) const
+  {
+    return values_.contains(key);
+  }
+
+  size_t erase(const Key& key)
+  {
+    if (values_.contains(key)) {
+      // Get the "pointer" into the list.
+      typename list::iterator i = values_[key].second;
+      keys_.erase(i);
+      return values_.erase(key);
+    }
+    return 0;
+  }
+
+  std::list<Key> keys() const
+  {
+    return keys_;
+  }
+
+  std::list<Value> values() const
+  {
+    std::list<Value> result;
+    foreach (const Key& key, keys_) {
+      result.push_back(values_.at(key).first);
+    }
+    return result;
+  }
+
+  size_t size() const
+  {
+    return keys_.size();
+  }
+
+  bool empty() const
+  {
+    return keys_.empty();
+  }
+
+  void clear()
+  {
+    values_.clear();
+    keys_.clear();
+  }
+
+private:
+  list keys_;  // Keys ordered by the insertion order.
+  map values_;  // Map of values and "pointers" to the linked list.
+};
+
+
+#endif // __STOUT_LINKEDHASHMAP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
new file mode 100644
index 0000000..aca97ca
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
@@ -0,0 +1,93 @@
+#include <stdint.h>
+
+#include <gtest/gtest.h>
+
+#include <list>
+#include <string>
+
+#include <stout/gtest.hpp>
+#include <stout/linkedhashmap.hpp>
+
+using std::list;
+using std::string;
+
+TEST(LinkedHashmapTest, Put)
+{
+  LinkedHashMap<string, int> map;
+
+  map["foo"] = 1;
+  ASSERT_SOME_EQ(1, map.get("foo"));
+  ASSERT_EQ(1, map.size());
+
+  map["bar"] = 2;
+  ASSERT_SOME_EQ(2, map.get("bar"));
+  ASSERT_EQ(2, map.size());
+
+  map["foo"] = 3;
+  ASSERT_SOME_EQ(3, map.get("foo"));
+  ASSERT_EQ(2, map.size());
+}
+
+
+TEST(LinkedHashmapTest, Contains)
+{
+  LinkedHashMap<string, int> map;
+  map["foo"] = 1;
+  map["bar"] = 2;
+  ASSERT_TRUE(map.contains("foo"));
+  ASSERT_TRUE(map.contains("bar"));
+  ASSERT_FALSE(map.contains("caz"));
+}
+
+
+TEST(LinkedHashmapTest, Erase)
+{
+  LinkedHashMap<string, int> map;
+
+  map["foo"] = 1;
+  map["bar"] = 2;
+  ASSERT_EQ(2, map.size());
+
+  ASSERT_EQ(1, map.erase("foo"));
+  ASSERT_EQ(0, map.erase("caz")); // Non-existent key.
+  ASSERT_NONE(map.get("foo"));
+  ASSERT_EQ(1, map.size());
+  ASSERT_SOME_EQ(2, map.get("bar"));
+}
+
+
+TEST(LinkedHashmapTest, Keys)
+{
+  LinkedHashMap<string, int> map;
+
+  std::list<string> keys;
+  keys.push_back("foo");
+  keys.push_back("bar");
+  keys.push_back("food");
+  keys.push_back("rad");
+  keys.push_back("cat");
+
+  // Insert keys into the map.
+  foreach (const string& key, keys) {
+    map[key] = 1;
+  }
+  map["foo"] = 1; // Re-insert a key.
+
+  // Ensure the keys returned are the same as insertion order.
+  ASSERT_EQ(keys, map.keys());
+}
+
+
+TEST(LinkedHashmapTest, Values)
+{
+  LinkedHashMap<string, int> map;
+
+  map["foo"] = 1;
+  map["bar"] = 2;
+  map["caz"] = 3;
+
+  int val = 0;
+  foreach (int value, map.values()) {
+   ASSERT_EQ(++val, value);
+  }
+}


[15/18] git commit: Fixed slave to always call Isolator::resourcesChanged() when an executor re-registers.

Posted by vi...@apache.org.
Fixed slave to always call Isolator::resourcesChanged() when an
executor re-registers.

Review: https://reviews.apache.org/r/13503


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e79eef3f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e79eef3f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e79eef3f

Branch: refs/heads/master
Commit: e79eef3fe8e961f5236aaeec1998006d0e200ed6
Parents: 7ca371c
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 12:07:37 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e79eef3f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 803da8d..cf9f292 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1648,6 +1648,13 @@ void Slave::reregisterExecutor(
         statusUpdate(update); // This also updates the executor's resources!
       }
 
+      // Tell the isolator to update the resources.
+      dispatch(isolator,
+               &Isolator::resourcesChanged,
+               frameworkId,
+               executorId,
+               executor->resources);
+
       // Now, if there is any task still in STAGING state and not in
       // 'tasks' known to the executor, the slave must have died
       // before the executor received the task! Relaunch it!


[14/18] git commit: Fixed executor driver to store tasks and updates in LinkedHashMap.

Posted by vi...@apache.org.
Fixed executor driver to store tasks and updates in LinkedHashMap.

Review: https://reviews.apache.org/r/13486


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/755308dc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/755308dc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/755308dc

Branch: refs/heads/master
Commit: 755308dca1976b2df08ef264108fff0e7fdc6aa9
Parents: 010fa31
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 17:58:50 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:40 2013 -0700

----------------------------------------------------------------------
 src/exec/exec.cpp | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/755308dc/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index d467724..1a0dd07 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -33,7 +33,7 @@
 #include <process/protobuf.hpp>
 
 #include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/fatal.hpp>
 #include <stout/numify.hpp>
@@ -246,12 +246,16 @@ protected:
     message.mutable_framework_id()->MergeFrom(frameworkId);
 
     // Send all unacknowledged updates.
-    foreachvalue (const StatusUpdate& update, updates) {
+    // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+    // supports it.
+    foreach (const StatusUpdate& update, updates.values()) {
       message.add_updates()->MergeFrom(update);
     }
 
     // Send all unacknowledged tasks.
-    foreachvalue (const TaskInfo& task, tasks) {
+    // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+    // supports it.
+    foreach (const TaskInfo& task, tasks.values()) {
       message.add_tasks()->MergeFrom(task);
     }
 
@@ -494,13 +498,13 @@ private:
   const string directory;
   bool checkpoint;
 
-  hashmap<UUID, StatusUpdate> updates; // Unacknowledged updates.
+  LinkedHashMap<UUID, StatusUpdate> updates; // Unacknowledged updates.
 
   // We store tasks that have not been acknowledged
   // (via status updates) by the slave. This ensures that, during
   // recovery, the slave relaunches only those tasks that have
   // never reached this executor.
-  hashmap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+  LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
 };
 
 } // namespace internal {


[06/18] git commit: Improved the allocator to not offer non-checkpointing slave resources to checkpointing frameworks.

Posted by vi...@apache.org.
Improved the allocator to not offer non-checkpointing slave resources
to checkpointing frameworks.

Review: https://reviews.apache.org/r/13407


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea6c766b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea6c766b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea6c766b

Branch: refs/heads/master
Commit: ea6c766b57b1b59e218e4fc47befede762e9231a
Parents: 6da3267
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 22:46:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700

----------------------------------------------------------------------
 src/master/hierarchical_allocator_process.hpp | 25 ++++++++++++++++------
 src/master/master.cpp                         | 15 ++++---------
 src/tests/slave_recovery_tests.cpp            | 17 ++++++++-------
 3 files changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 76465eb..183b205 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -61,6 +61,7 @@ struct Slave
   Slave(const SlaveInfo& _info)
     : available(_info.resources()),
       whitelisted(false),
+      checkpoint(_info.checkpoint()),
       info(_info) {}
 
   Resources resources() const { return info.resources(); }
@@ -74,6 +75,7 @@ struct Slave
   // frameworks.
   bool whitelisted;
 
+  bool checkpoint;
 private:
   SlaveInfo info;
 };
@@ -84,13 +86,15 @@ struct Framework
   Framework() {}
 
   Framework(const FrameworkInfo& _info)
-    : info(_info) {}
+    : checkpoint(_info.checkpoint()),
+      info(_info) {}
 
   std::string role() const { return info.role(); }
 
   // Filters that have been added by this framework.
   hashset<Filter*> filters;
 
+  bool checkpoint;
 private:
   FrameworkInfo info;
 };
@@ -773,19 +777,28 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
     const SlaveID& slaveId,
     const Resources& resources)
 {
-  bool filtered = false;
-
   CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+
+  // Do not offer a non-checkpointing slave's resources to a checkpointing
+  // framework. This is a short term fix until the following is resolved:
+  // https://issues.apache.org/jira/browse/MESOS-444.
+  if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
+    VLOG(1) << "Filtered " << resources
+            << " on non-checkpointing slave " << slaveId
+            << " for checkpointing framework " << frameworkId;
+    return true;
+  }
+
   foreach (Filter* filter, frameworks[frameworkId].filters) {
     if (filter->filter(slaveId, resources)) {
       VLOG(1) << "Filtered " << resources
               << " on slave " << slaveId
               << " for framework " << frameworkId;
-      filtered = true;
-      break;
+      return true;
     }
   }
-  return filtered;
+  return false;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b6d12a3..0675b52 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1349,17 +1349,10 @@ void Master::offer(const FrameworkID& frameworkId,
 
     Slave* slave = slaves[slaveId];
 
-    // Do not offer a non-checkpointing slave's resources to a checkpointing
-    // framework. This is a short term fix until the following is resolved:
-    // https://issues.apache.org/jira/browse/MESOS-444.
-    if (framework->info.checkpoint() && !slave->info.checkpoint()) {
-      LOG(WARNING) << "Master returning resources offered to checkpointing "
-                   << "framework " << frameworkId << " because slave "
-                   << slaveId << " is not checkpointing";
-
-      allocator->resourcesRecovered(frameworkId, slaveId, offered);
-      continue;
-    }
+    CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
+        << "Resources of non checkpointing slave " << slaveId
+        << " (" << slave->info.hostname() << ") are being offered to"
+        << " checkpointing framework " << frameworkId;
 
     // This could happen if the allocator dispatched 'Master::offer' before
     // it received 'Allocator::slaveRemoved' from the master.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index a0734c3..6de8108 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1034,9 +1034,16 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
   slave::Flags flags = this->CreateSlaveFlags();
   flags.checkpoint = false;
 
+  Clock::pause();
+
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+    FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
   Try<PID<Slave> > slave = this->StartSlave(&isolator, flags);
   ASSERT_SOME(slave);
 
+  AWAIT_READY(registerSlaveMessage);
+
   MockScheduler sched;
 
   // Enable checkpointing for the framework.
@@ -1053,17 +1060,11 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
   EXPECT_CALL(sched, resourceOffers(_, _))
     .Times(0); // No offers should be received!
 
-  Future<Nothing> offer = FUTURE_DISPATCH(_, &Master::offer);
-
-  Clock::pause();
-
   driver.start();
 
-  AWAIT_READY(registered);
-
-  // Wait for an offer to be made. We do a Clock::settle() here
+  // Wait for scheduler to register. We do a Clock::settle() here
   // to ensure that no offers are received by the scheduler.
-  AWAIT_READY(offer);
+  AWAIT_READY(registered);
   Clock::settle();
 
   driver.stop();


[04/18] git commit: Fixed slave to always delete the latest slave symlink on shutdown or cleanup.

Posted by vi...@apache.org.
Fixed slave to always delete the latest slave symlink on
shutdown or cleanup.

Review: https://reviews.apache.org/r/13392


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ce2edda
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ce2edda
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ce2edda

Branch: refs/heads/master
Commit: 9ce2edda4f1daf2b9e68bcc2ffe2e1a0c072ed73
Parents: ea6c766
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:29:26 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9ce2edda/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b835ac7..aa0dbd7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -474,12 +474,13 @@ void Slave::finalize()
     }
   }
 
-  if (flags.checkpoint &&
-      (state == TERMINATING || flags.recover == "cleanup")) {
+  if (state == TERMINATING || flags.recover == "cleanup") {
     // We remove the "latest" symlink in meta directory, so that the
     // slave doesn't recover the state when it restarts and registers
     // as a new slave with the master.
-    CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+    if (os::exists(paths::getLatestSlavePath(metaDir))) {
+      CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+    }
   }
 
   // Stop the isolator.
@@ -1143,8 +1144,8 @@ void Slave::shutdownFramework(const FrameworkID& frameworkId)
   // its a message from the currently registered master.
   if (from && from != master) {
     LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
-                 << " from " << from << "because it is not from the registered "
-                 << "master (" << master << ")";
+                 << " from " << from << " because it is not from the registered"
+                 << " master (" << master << ")";
     return;
   }
 


[13/18] git commit: Fixed slave to use LinkedHashMap to store tasks.

Posted by vi...@apache.org.
Fixed slave to use LinkedHashMap to store tasks.

Review: https://reviews.apache.org/r/13485


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/010fa314
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/010fa314
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/010fa314

Branch: refs/heads/master
Commit: 010fa31495cbede6528f2a576497e9b890fcebfb
Parents: a2ebf17
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 17:09:17 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:56:36 2013 -0700

----------------------------------------------------------------------
 src/slave/http.cpp  | 18 +++++++++++++-----
 src/slave/slave.cpp | 40 ++++++++++++++++++++++++++++++----------
 src/slave/slave.hpp |  7 ++++---
 3 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index c45dfb4..073d092 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -204,22 +204,30 @@ JSON::Object model(const Executor& executor)
   object.values["resources"] = model(executor.resources);
 
   JSON::Array tasks;
-  foreachvalue (Task* task, executor.launchedTasks) {
+  foreach (Task* task, executor.launchedTasks.values()) {
     tasks.values.push_back(model(*task));
   }
   object.values["tasks"] = tasks;
 
   JSON::Array queued;
-  foreachvalue (const TaskInfo& task, executor.queuedTasks) {
+  foreach (const TaskInfo& task, executor.queuedTasks.values()) {
     queued.values.push_back(model(task));
   }
   object.values["queued_tasks"] = queued;
 
-  JSON::Array completedTasks;
+  JSON::Array completed;
   foreach (const Task& task, executor.completedTasks) {
-    completedTasks.values.push_back(model(task));
+    completed.values.push_back(model(task));
   }
-  object.values["completed_tasks"] = completedTasks;
+
+  // NOTE: We add 'terminatedTasks' to 'completed_tasks' for
+  // simplicity.
+  // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+  // supports it.
+  foreach (Task* task, executor.terminatedTasks.values()) {
+    completed.values.push_back(model(*task));
+  }
+  object.values["completed_tasks"] = completed;
 
   return object;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 83c250a..11e4826 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -714,12 +714,16 @@ void Slave::doReliableRegistration()
         executorInfo->mutable_framework_id()->MergeFrom(framework->id);
 
         // Add launched tasks.
-        foreachvalue (Task* task, executor->launchedTasks) {
+        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+        // supports it.
+        foreach (Task* task, executor->launchedTasks.values()) {
           message.add_tasks()->CopyFrom(*task);
         }
 
         // Add queued tasks.
-        foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+        // supports it.
+        foreach (const TaskInfo& task, executor->queuedTasks.values()) {
           const Task& t = protobuf::createTask(
               task, TASK_STAGING, executor->id, framework->id);
 
@@ -727,7 +731,9 @@ void Slave::doReliableRegistration()
         }
 
         // Add terminated tasks.
-        foreachvalue (Task* task, executor->terminatedTasks) {
+        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+        // supports it.
+        foreach (Task* task, executor->terminatedTasks.values()) {
           message.add_tasks()->CopyFrom(*task);
         }
       }
@@ -1509,7 +1515,9 @@ void Slave::registerExecutor(
       }
 
       // First account for the tasks we're about to start.
-      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+      // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (const TaskInfo& task, executor->queuedTasks.values()) {
         // Add the task to the executor.
         executor->addTask(task);
       }
@@ -1534,7 +1542,9 @@ void Slave::registerExecutor(
       message.mutable_slave_info()->MergeFrom(info);
       send(executor->pid, message);
 
-      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+      // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (const TaskInfo& task, executor->queuedTasks.values()) {
         LOG(INFO) << "Flushing queued task " << task.task_id()
                   << " for executor '" << executor->id << "'"
                   << " of framework " << framework->id;
@@ -1641,7 +1651,9 @@ void Slave::reregisterExecutor(
         launched[task.task_id()] = task;
       }
 
-      foreachvalue (Task* task, executor->launchedTasks) {
+      // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+      // supports it.
+      foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !launched.contains(task->task_id())) {
 
@@ -2138,7 +2150,9 @@ void Slave::executorTerminated(
         StatusUpdate update;
 
         // Transition all live launched tasks.
-        foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
+        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+        // supports it.
+        foreach (Task* task, executor->launchedTasks.values()) {
           if (!protobuf::isTerminalState(task->state())) {
             mesos::TaskState taskState;
             isCommandExecutor = !task->has_executor_id();
@@ -2158,8 +2172,9 @@ void Slave::executorTerminated(
         }
 
         // Transition all queued tasks.
-        foreachvalue (const TaskInfo& task,
-                      utils::copy(executor->queuedTasks)) {
+        // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+        // supports it.
+        foreach (const TaskInfo& task, executor->queuedTasks.values()) {
           mesos::TaskState taskState;
           isCommandExecutor = task.has_command();
           if (destroyed || isCommandExecutor.get()) {
@@ -3006,7 +3021,12 @@ Executor::Executor(
 Executor::~Executor()
 {
   // Delete the tasks.
-  foreachvalue (Task* task, launchedTasks) {
+  // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+  // supports it.
+  foreach (Task* task, launchedTasks.values()) {
+    delete task;
+  }
+  foreach (Task* task, terminatedTasks.values()) {
     delete task;
   }
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 464d224..93fd32e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -34,6 +34,7 @@
 #include <process/protobuf.hpp>
 
 #include <stout/bytes.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/multihashmap.hpp>
@@ -378,9 +379,9 @@ struct Executor
 
   Resources resources; // Currently consumed resources.
 
-  hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
-  hashmap<TaskID, Task*> launchedTasks;  // Running.
-  hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+  LinkedHashMap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+  LinkedHashMap<TaskID, Task*> launchedTasks;  // Running.
+  LinkedHashMap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
   boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
 private:


[17/18] git commit: Fixed master to properly reconcile completed frameworks when slave re-registers.

Posted by vi...@apache.org.
Fixed master to properly reconcile completed frameworks when
slave re-registers.

Review: https://reviews.apache.org/r/13508


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bc167a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bc167a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bc167a8

Branch: refs/heads/master
Commit: 3bc167a8137a13287104ff884e1b940fbf95ae4f
Parents: e79eef3
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 15:01:22 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp              | 26 ++++++++-
 src/tests/slave_recovery_tests.cpp | 93 +++++++++++++++++++++++++++++++++
 2 files changed, 118 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3bc167a8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2b60e32..d53b8bb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -54,7 +54,7 @@ using process::wait; // Necessary on some OS's to disambiguate.
 
 using std::tr1::cref;
 using std::tr1::bind;
-
+using std::tr1::shared_ptr;
 
 namespace mesos {
 namespace internal {
@@ -1775,6 +1775,9 @@ Resources Master::launchTask(const TaskInfo& task,
 }
 
 
+// NOTE: This function is only called when the slave re-registers
+// with a master that already knows about it (i.e., not a failed
+// over master).
 void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
 {
   CHECK_NOTNULL(slave);
@@ -1824,6 +1827,27 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
       send(slave->pid, message);
     }
   }
+
+  // Send ShutdownFrameworkMessages for frameworks that are completed.
+  // This could happen if the message wasn't received by the slave
+  // (e.g., slave was down, partitioned).
+  // NOTE: This is a short-term hack because this information is lost
+  // when the master fails over. Also, 'completedFrameworks' has a
+  // limited capacity.
+  // TODO(vinod): Revisit this when registrar is in place. It would
+  // likely involve storing this information in the registrar.
+  foreach (const shared_ptr<Framework>& framework, completedFrameworks) {
+    if (slaveTasks.contains(framework->id)) {
+      LOG(WARNING)
+        << "Slave " << slave->id << " (" << slave->info.hostname()
+        << ") re-registered with completed framework " << framework->id
+        << ". Shutting down the framework on the slave";
+
+      ShutdownFrameworkMessage message;
+      message.mutable_framework_id()->MergeFrom(framework->id);
+      send(slave->pid, message);
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3bc167a8/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index ef6dbf7..28a2628 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1601,3 +1601,96 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
 
   this->Shutdown(); // Shutdown before isolator(s) get deallocated.
 }
+
+
+// This test verifies that when the slave recovers and re-registers
+// with a framework that was shutdown when the slave was down, it gets
+// a ShutdownFramework message.
+TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  Future<RegisterSlaveMessage> registerSlaveMessage =
+    FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+  TypeParam isolator1;
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(registerSlaveMessage);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  // Capture the slave and framework ids.
+  SlaveID slaveId = offers.get()[0].slave_id();
+  FrameworkID frameworkId = offers.get()[0].framework_id();
+
+  EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING
+
+  Future<Nothing> _statusUpdateAcknowledgement =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Wait for TASK_RUNNING update to be acknowledged.
+  AWAIT_READY(_statusUpdateAcknowledgement);
+
+  this->Stop(slave.get());
+
+  Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
+    FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+
+  // Now stop the framework.
+  driver.stop();
+  driver.join();
+
+  // Wait util the framework is removed.
+  AWAIT_READY(unregisterFrameworkMessage);
+
+  Future<ShutdownFrameworkMessage> shutdownFrameworkMessage =
+    FUTURE_PROTOBUF(ShutdownFrameworkMessage(), _, _);
+
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+  // Now restart the slave (use same flags) with a new isolator.
+  TypeParam isolator2;
+
+  slave = this->StartSlave(&isolator2, flags);
+  ASSERT_SOME(slave);
+
+  // Slave should get a ShutdownFrameworkMessage.
+  AWAIT_READY(shutdownFrameworkMessage);
+
+  // Ensure that the executor is terminated.
+  AWAIT_READY(executorTerminated);
+
+  this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}


[10/18] git commit: Fixed cgroups isolator to do setsid on the executor process.

Posted by vi...@apache.org.
Fixed cgroups isolator to do setsid on the executor process.

Review: https://reviews.apache.org/r/13449


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/376cd66c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/376cd66c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/376cd66c

Branch: refs/heads/master
Commit: 376cd66c4a6b71d5444acf615db1fc098747da75
Parents: 649295f
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 15:00:16 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:36:00 2013 -0700

----------------------------------------------------------------------
 src/slave/cgroups_isolator.cpp     | 51 ++++++++++++++++++++++++++++++++-
 src/slave/process_isolator.cpp     |  2 --
 src/tests/slave_recovery_tests.cpp |  6 ++++
 3 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 3427c62..d4ccd11 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -532,6 +532,19 @@ void CgroupsIsolator::launchExecutor(
   // Start listening on OOM events.
   oomListen(frameworkId, executorId);
 
+  // Use pipes to determine which child has successfully changed session.
+  int pipes[2];
+  if (pipe(pipes) < 0) {
+    PLOG(FATAL) << "Failed to create a pipe";
+  }
+
+  // Set the FD_CLOEXEC flags on these pipes
+  Try<Nothing> cloexec = os::cloexec(pipes[0]);
+  CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[0]";
+
+  cloexec = os::cloexec(pipes[1]);
+  CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[1]";
+
   // Launch the executor using fork-exec.
   pid_t pid;
   if ((pid = ::fork()) == -1) {
@@ -539,6 +552,15 @@ void CgroupsIsolator::launchExecutor(
   }
 
   if (pid > 0) {
+    os::close(pipes[1]);
+
+    // Get the child's pid via the pipe.
+    if (read(pipes[0], &pid, sizeof(pid)) == -1) {
+      PLOG(FATAL) << "Failed to get child PID from pipe";
+    }
+
+    os::close(pipes[0]);
+
     // In parent process.
     LOG(INFO) << "Forked executor at = " << pid;
 
@@ -558,7 +580,34 @@ void CgroupsIsolator::launchExecutor(
              executorId,
              pid);
   } else {
-    // In child process.
+    // In child process, we make cleanup easier by putting process
+    // into it's own session. DO NOT USE GLOG!
+    os::close(pipes[0]);
+
+    // NOTE: We setsid() in a loop because setsid() might fail if another
+    // process has the same process group id as the calling process.
+    while ((pid = setsid()) == -1) {
+      perror("Could not put executor in its own session");
+
+      std::cout << "Forking another process and retrying ..." << std::endl;
+
+      if ((pid = fork()) == -1) {
+        perror("Failed to fork to launch executor");
+        abort();
+      }
+
+      if (pid > 0) {
+        // In parent process.
+        exit(0);
+      }
+    }
+
+    if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
+      perror("Failed to write PID on pipe");
+      abort();
+    }
+
+    os::close(pipes[1]);
 
     launcher::ExecutorLauncher launcher(
         slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a80b047..24a7fb2 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -191,8 +191,6 @@ void ProcessIsolator::launchExecutor(
 
       if (pid > 0) {
         // In parent process.
-        // It is ok to suicide here, though process reaper signals the exit,
-        // because the process isolator ignores unknown processes.
         exit(0);
       }
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6de8108..ef6dbf7 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -734,6 +734,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
 
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .Times(AtMost(1));
+
   Future<Nothing> schedule = FUTURE_DISPATCH(
       _, &GarbageCollectorProcess::schedule);
 
@@ -1366,6 +1369,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 
   AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
 
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .Times(AtMost(1));
+
   Future<Nothing> executorTerminated =
     FUTURE_DISPATCH(_, &Slave::executorTerminated);
 


[08/18] git commit: Fixed slave to not recover terminated executors.

Posted by vi...@apache.org.
Fixed slave to not recover terminated executors.

Review: https://reviews.apache.org/r/13450


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/649295f3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/649295f3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/649295f3

Branch: refs/heads/master
Commit: 649295f34d6b7a70314a52cd6db3f74208941b98
Parents: fd3584a
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 14:27:02 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700

----------------------------------------------------------------------
 src/slave/monitor.cpp               |  2 +-
 src/slave/slave.cpp                 | 13 +++++++++++--
 src/slave/status_update_manager.cpp | 13 ++++++-------
 src/slave/status_update_manager.hpp |  2 +-
 4 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 4f3c91f..8e1eb35 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -183,7 +183,7 @@ void ResourceMonitorProcess::_collect(
   } else {
     // Note that the isolator might have been terminated and pending
     // dispatches deleted, causing the future to get discarded.
-    LOG(WARNING)
+    VLOG(1)
       << "Failed to collect resource usage for executor '" << executorId
       << "' of framework '" << frameworkId << "': "
       << (statistics.isFailed() ? statistics.failure() : "Future discarded");

http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5fa1fa7..e8176d2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2348,8 +2348,8 @@ void _unwatch(
 {
   if (!unwatch.isReady()) {
     LOG(ERROR) << "Failed to unwatch executor " << executorId
-               << " of framework " << frameworkId
-               << ": " << unwatch.isFailed() ? unwatch.failure() : "discarded";
+               << " of framework " << frameworkId << ": "
+               << (unwatch.isFailed() ? unwatch.failure() : "discarded");
   }
 }
 
@@ -2619,6 +2619,15 @@ Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
 {
   foreachvalue(Framework* framework, frameworks){
     foreachvalue(Executor* executor, framework->executors) {
+      // If the executor is already terminating/terminated don't
+      // bother reconnecting or killing it. This could happen if
+      // the recovered isolator sent a 'ExecutorTerminated' message
+      // before the slave is here.
+      if (executor->state == Executor::TERMINATING ||
+          executor->state == Executor::TERMINATED) {
+        continue;
+      }
+
       // Monitor the executor.
       monitor.watch(
           framework->id,

http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index ffd4736..6d4598e 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -297,8 +297,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
   const TaskID& taskId = update.status().task_id();
   const FrameworkID& frameworkId = update.framework_id();
 
-  LOG(INFO) << "Received status update " << update
-            << " with checkpoint=" << stringify(checkpoint);
+  LOG(INFO) << "Received status update " << update;
 
   // Write the status update to disk and enqueue it to send it to the master.
   // Create/Get the status update stream for this task.
@@ -457,8 +456,8 @@ StatusUpdateStream* StatusUpdateManagerProcess::createStatusUpdateStream(
     const Option<ExecutorID>& executorId,
     const Option<UUID>& uuid)
 {
-  LOG(INFO) << "Creating StatusUpdate stream for task " << taskId
-            << " of framework " << frameworkId;
+  VLOG(1) << "Creating StatusUpdate stream for task " << taskId
+          << " of framework " << frameworkId;
 
   StatusUpdateStream* stream = new StatusUpdateStream(
       taskId, frameworkId, slaveId, flags, checkpoint, executorId, uuid);
@@ -488,9 +487,9 @@ void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
     const TaskID& taskId,
     const FrameworkID& frameworkId)
 {
-  LOG(INFO) << "Cleaning up status update stream"
-            << " for task " << taskId
-            << " of framework " << frameworkId;
+  VLOG(1) << "Cleaning up status update stream"
+          << " for task " << taskId
+          << " of framework " << frameworkId;
 
   CHECK(streams.contains(frameworkId))
     << "Cannot find the status update streams for framework " << frameworkId;

http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index da92760..ffc79ae 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -285,7 +285,7 @@ struct StatusUpdateStream
       return Error(error.get());
     }
 
-    LOG(INFO) << "Replaying status update stream for task " << taskId;
+    VLOG(1) << "Replaying status update stream for task " << taskId;
 
     foreach (const StatusUpdate& update, updates) {
       // Handle the update.


[09/18] git commit: Fixed master to properly handle TASK_LOST updates generated by it.

Posted by vi...@apache.org.
Fixed master to properly handle TASK_LOST updates generated by it.

Review: https://reviews.apache.org/r/13446


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd3584a7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd3584a7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd3584a7

Branch: refs/heads/master
Commit: fd3584a71f68015fcfff70cc889b32fac28f941b
Parents: ba2ee7c
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 11:08:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 121 ++++++++++++++-----------------
 src/messages/messages.proto         |   2 +
 src/tests/fault_tolerance_tests.cpp |  22 +++---
 3 files changed, 68 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0675b52..6530008 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -875,6 +875,9 @@ void Master::launchTasks(const FrameworkID& frameworkId,
         status->set_message("Task launched with invalid offer");
         update->set_timestamp(Clock::now().secs());
         update->set_uuid(UUID::random().toBytes());
+
+        LOG(INFO) << "Sending status update " << *update
+                  << " for launch task attempt on invalid offer " << offerId;
         send(framework->pid, message);
       }
     }
@@ -1163,10 +1166,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   // NOTE: We cannot use 'from' here to identify the slave as this is
   // now sent by the StatusUpdateManagerProcess. Only 'pid' can
   // be used to identify the slave.
-  LOG(INFO) << "Status update from " << pid
-            << ": task " << status.task_id()
-            << " of framework " << update.framework_id()
-            << " is now in state " << status.state();
+  LOG(INFO) << "Status update " << update << " from " << pid;
 
   Slave* slave = getSlave(update.slave_id());
   if (slave == NULL) {
@@ -1174,12 +1174,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
       // If the slave is deactivated, we have already informed
       // frameworks that its tasks were LOST, so the slave should
       // shut down.
-      LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+      LOG(WARNING) << "Ignoring status update " << update
+                   << " from deactivated slave " << pid
                    << " with id " << update.slave_id() << " ; asking slave "
                    << " to shutdown";
       send(pid, ShutdownMessage());
     } else {
-      LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+      LOG(WARNING) << "Ignoring status update " << update
+                   << " from unknown slave " << pid
                    << " with id " << update.slave_id();
     }
     stats.invalidStatusUpdates++;
@@ -1190,7 +1192,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 
   Framework* framework = getFramework(update.framework_id());
   if (framework == NULL) {
-    LOG(WARNING) << "Ignoring status update from " << pid << " ("
+    LOG(WARNING) << "Ignoring status update " << update
+                 << " from " << pid << " ("
                  << slave->info.hostname() << "): error, couldn't lookup "
                  << "framework " << update.framework_id();
     stats.invalidStatusUpdates++;
@@ -1206,7 +1209,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   // Lookup the task and see if we need to update anything locally.
   Task* task = slave->getTask(update.framework_id(), status.task_id());
   if (task == NULL) {
-    LOG(WARNING) << "Status update from " << pid << " ("
+    LOG(WARNING) << "Status update " << update
+                 << " from " << pid << " ("
                  << slave->info.hostname() << "): error, couldn't lookup "
                  << "task " << status.task_id();
     stats.invalidStatusUpdates++;
@@ -1225,10 +1229,11 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 }
 
 
-void Master::exitedExecutor(const SlaveID& slaveId,
-                            const FrameworkID& frameworkId,
-                            const ExecutorID& executorId,
-                            int32_t status)
+void Master::exitedExecutor(
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    int32_t status)
 {
   // Only update master's internal data structures here for properly accounting.
   // The TASK_LOST updates are handled by the slave.
@@ -1671,6 +1676,7 @@ void Master::processTasks(Offer* offer,
           TASK_LOST,
           error.get());
 
+      LOG(INFO) << "Sending status update " << update << " for invalid task";
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
       send(framework->pid, message);
@@ -1786,20 +1792,14 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
                    << " of framework " << task->framework_id()
                    << " unknown to the slave " << slave->id;
 
-      Framework* framework = getFramework(task->framework_id());
-      if (framework != NULL) {
-        const StatusUpdate& update = protobuf::createStatusUpdate(
-            task->framework_id(),
-            slave->id,
-            task->task_id(),
-            TASK_LOST,
-            "Task was not received by the slave");
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          task->framework_id(),
+          slave->id,
+          task->task_id(),
+          TASK_LOST,
+          "Task is unknown to the slave");
 
-        StatusUpdateMessage message;
-        message.mutable_update()->CopyFrom(update);
-        send(framework->pid, message);
-      }
-      removeTask(task);
+      statusUpdate(update, UPID());
     }
   }
 
@@ -1893,6 +1893,10 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
 
 void Master::removeFramework(Framework* framework)
 {
+  CHECK_NOTNULL(framework);
+
+  LOG(INFO) << "Removing framework " << framework->id;
+
   if (framework->active) {
     // Tell the allocator to stop allocating resources to this framework.
     allocator->frameworkDeactivated(framework->id);
@@ -1961,6 +1965,10 @@ void Master::removeFramework(Slave* slave, Framework* framework)
   CHECK_NOTNULL(slave);
   CHECK_NOTNULL(framework);
 
+  LOG(INFO) << "Removing framework " << framework->id
+            << " from slave " << slave->id
+            << " (" << slave->info.hostname() << ")";
+
   // Remove pointers to framework's tasks in slaves, and send status updates.
   foreachvalue (Task* task, utils::copy(slave->tasks)) {
     // Remove tasks that belong to this framework.
@@ -1968,25 +1976,16 @@ void Master::removeFramework(Slave* slave, Framework* framework)
       // A framework might not actually exist because the master failed
       // over and the framework hasn't reconnected yet. For more info
       // please see the comments in 'removeFramework(Framework*)'.
-      StatusUpdateMessage message;
-      message.mutable_update()->CopyFrom(
-          protobuf::createStatusUpdate(
-              task->framework_id(),
-              task->slave_id(),
-              task->task_id(),
-              TASK_LOST,
-              "Slave " + slave->info.hostname() + " disconnected",
-              (task->has_executor_id() ?
-                  Option<ExecutorID>(task->executor_id()) : None())));
-
-      LOG(INFO) << "Sending status update " << message.update()
-                << " due to disconnected slave " << slave->id
-                << " (" << slave->info.hostname() << ")";
-
-      send(framework->pid, message);
-
-      // Remove the task from slave and framework.
-      removeTask(task);
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+        task->framework_id(),
+        task->slave_id(),
+        task->task_id(),
+        TASK_LOST,
+        "Slave " + slave->info.hostname() + " disconnected",
+        (task->has_executor_id()
+            ? Option<ExecutorID>(task->executor_id()) : None()));
+
+      statusUpdate(update, UPID());
     }
   }
 
@@ -2122,6 +2121,9 @@ void Master::removeSlave(Slave* slave)
 {
   CHECK_NOTNULL(slave);
 
+  LOG(INFO) << "Removing slave " << slave->id
+            << " (" << slave->info.hostname() << ")";
+
   // We do this first, to make sure any of the resources recovered
   // below (e.g., removeTask()) are ignored by the allocator.
   if (!slave->disconnected) {
@@ -2130,8 +2132,6 @@ void Master::removeSlave(Slave* slave)
 
   // Remove pointers to slave's tasks in frameworks, and send status updates
   foreachvalue (Task* task, utils::copy(slave->tasks)) {
-    Framework* framework = getFramework(task->framework_id());
-
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
     // situation for frameworks that want to have high-availability,
@@ -2140,25 +2140,16 @@ void Master::removeSlave(Slave* slave)
     // want to do is create a local Framework object to represent that
     // framework until it fails over. See the TODO above in
     // Master::reregisterSlave.
-    if (framework != NULL) {
-      StatusUpdateMessage message;
-      message.mutable_update()->CopyFrom(
-          protobuf::createStatusUpdate(
-              task->framework_id(),
-              task->slave_id(),
-              task->task_id(),
-              TASK_LOST,
-              "Slave " + slave->info.hostname() + " removed",
-              (task->has_executor_id() ?
-                  Option<ExecutorID>(task->executor_id()) : None())));
-
-      LOG(INFO) << "Sending status update " << message.update()
-                << " due to the removal of slave "
-                << slave->id << " (" << slave->info.hostname() << ")";
-
-      send(framework->pid, message);
-    }
-    removeTask(task);
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        task->framework_id(),
+        task->slave_id(),
+        task->task_id(),
+        TASK_LOST,
+        "Slave " + slave->info.hostname() + " removed",
+        (task->has_executor_id() ?
+            Option<ExecutorID>(task->executor_id()) : None()));
+
+    statusUpdate(update, UPID());
   }
 
   foreach (Offer* offer, utils::copy(slave->offers)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19d4b38..4d400c2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -185,6 +185,8 @@ message KillTaskMessage {
 }
 
 
+// NOTE: If 'pid' is present, scheduler driver sends an
+// acknowledgement to the pid.
 message StatusUpdateMessage {
   required StatusUpdate update = 1;
   optional string pid = 2;

http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 7bfe3b1..d74463f 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -431,18 +431,16 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
 
   // At this point, the slave still thinks it's registered, so we
   // simulate a status update coming from the slave.
-  StatusUpdateMessage statusUpdate;
-  statusUpdate.set_pid(stringify(slave.get()));
-  statusUpdate.mutable_update()->mutable_framework_id()->set_value(
-      frameworkId.get().value());
-  statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
-  statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
-  statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
-      "task_id");
-  statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
-  statusUpdate.mutable_update()->set_timestamp(Clock::now().secs());
-  statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
-  process::post(master.get(), statusUpdate);
+  TaskID taskId;
+  taskId.set_value("task_id");
+  const StatusUpdate& update = createStatusUpdate(
+      frameworkId.get(), slaveId, taskId, TASK_RUNNING);
+
+  StatusUpdateMessage message;
+  message.mutable_update()->CopyFrom(update);
+  message.set_pid(stringify(slave.get()));
+
+  process::post(master.get(), message);
 
   // The master should shutdown the slave upon receiving the update.
   AWAIT_READY(shutdownMessage);