You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:24 UTC
[01/18] git commit: Fixed slave to not re-register with master in
cleanup mode.
Updated Branches:
refs/heads/master 853c9bab4 -> ad2d66249
Fixed slave to not re-register with master in cleanup mode.
Review: https://reviews.apache.org/r/13442
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6da32671
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6da32671
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6da32671
Branch: refs/heads/master
Commit: 6da32671fe678a074dd3bab9c5b12850cabd6f7d
Parents: ab8d5d3
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 8 15:51:49 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:27 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 9 +++++++--
src/tests/slave_recovery_tests.cpp | 14 ++++++++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6da32671/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index dbc4473..b835ac7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -438,8 +438,13 @@ void Slave::_initialize(const Future<Nothing>& future)
// in 'cleanup' mode.
if (frameworks.empty() && flags.recover == "cleanup") {
terminate(self());
- } else {
- // Register with the master.
+ } else if (flags.recover == "reconnect") {
+ // Re-register if reconnecting.
+ // NOTE: Since the slave in cleanup mode never re-registers, if
+ // the master fails over it will not forward the updates from
+ // the "unknown" slave to the scheduler. This could lead to the
+ // slave waiting indefinitely for acknowledgements. The master's
+ // registrar could help in handling this correctly.
state = DISCONNECTED;
if (master) {
doReliableRegistration();
http://git-wip-us.apache.org/repos/asf/mesos/blob/6da32671/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index bd755f6..a0734c3 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -791,8 +791,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
Future<vector<Offer> > offers;
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
+ .WillOnce(FutureArg<1>(&offers));
driver.start();
@@ -815,10 +814,17 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
this->Stop(slave.get());
+ // Slave in cleanup mode shouldn't reregister with slave and hence
+ // no offers should be made by the master.
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .Times(0);
+
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
+ Future<Nothing> _initialize = FUTURE_DISPATCH(_, &Slave::_initialize);
+
// Restart the slave in 'cleanup' recovery mode with a new isolator.
TypeParam isolator2;
@@ -839,6 +845,10 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
AWAIT_READY(status);
ASSERT_EQ(TASK_FAILED, status.get().state());
+ // Wait for recovery to complete.
+ AWAIT_READY(_initialize);
+ Clock::settle();
+
Clock::resume();
driver.stop();
[07/18] git commit: Changed slave state recovery to ignore absence of
files as safe.
Posted by vi...@apache.org.
Changed slave state recovery to ignore absence of files as safe.
Review: https://reviews.apache.org/r/13443
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba2ee7cf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba2ee7cf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba2ee7cf
Branch: refs/heads/master
Commit: ba2ee7cf1cab0a1edef573fe360b58e670e1d548
Parents: 24991c9
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Aug 8 19:41:01 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:57 2013 -0700
----------------------------------------------------------------------
src/slave/state.cpp | 54 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba2ee7cf/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 77b29dc..cd74e41 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -70,6 +70,13 @@ Try<SlaveState> SlaveState::recover(
// Read the slave info.
const string& path = paths::getSlaveInfoPath(rootDir, slaveId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died before it registered
+ // with the master.
+ LOG(WARNING) << "Failed to find slave info file '" << path << "'";
+ return state;
+ }
+
const Result<SlaveInfo>& slaveInfo = ::protobuf::read<SlaveInfo>(path);
if (!slaveInfo.isSome()) {
@@ -126,6 +133,14 @@ Try<FrameworkState> FrameworkState::recover(
// Read the framework info.
string path = paths::getFrameworkInfoPath(rootDir, slaveId, frameworkId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died after creating the
+ // framework directory but before it checkpointed the
+ // framework info.
+ LOG(WARNING) << "Failed to find framework info file '" << path << "'";
+ return state;
+ }
+
const Result<FrameworkInfo>& frameworkInfo =
::protobuf::read<FrameworkInfo>(path);
@@ -145,6 +160,13 @@ Try<FrameworkState> FrameworkState::recover(
// Read the framework pid.
path = paths::getFrameworkPidPath(rootDir, slaveId, frameworkId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died after creating the
+ // framework info but before it checkpointed the framework pid.
+ LOG(WARNING) << "Failed to framework pid file '" << path << "'";
+ return state;
+ }
+
const Try<string>& pid = os::read(path);
if (pid.isError()) {
@@ -205,6 +227,12 @@ Try<ExecutorState> ExecutorState::recover(
// Read the executor info.
const string& path =
paths::getExecutorInfoPath(rootDir, slaveId, frameworkId, executorId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died after creating the executor
+ // directory but before it checkpointed the executor info.
+ LOG(WARNING) << "Failed to find executor info file '" << path << "'";
+ return state;
+ }
const Result<ExecutorInfo>& executorInfo =
::protobuf::read<ExecutorInfo>(path);
@@ -330,6 +358,12 @@ Try<RunState> RunState::recover(
// Read the forked pid.
string path = paths::getForkedPidPath(
rootDir, slaveId, frameworkId, executorId, uuid);
+ if (!os::exists(path)) {
+ // This could happen if the slave died before the isolator
+ // checkpointed the forked pid.
+ LOG(WARNING) << "Failed to find executor forked pid file '" << path << "'";
+ return state;
+ }
Try<string> pid = os::read(path);
@@ -357,6 +391,14 @@ Try<RunState> RunState::recover(
path = paths::getLibprocessPidPath(
rootDir, slaveId, frameworkId, executorId, uuid);
+ if (!os::exists(path)) {
+ // This could happen if the slave died before the executor
+ // registered with the slave.
+ LOG(WARNING)
+ << "Failed to find executor libprocess pid file '" << path << "'";
+ return state;
+ }
+
pid = os::read(path);
if (pid.isError()) {
@@ -399,6 +441,12 @@ Try<TaskState> TaskState::recover(
// Read the task info.
string path = paths::getTaskInfoPath(
rootDir, slaveId, frameworkId, executorId, uuid, taskId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died after creating the task
+ // directory but before it checkpointed the task info.
+ LOG(WARNING) << "Failed to find task info file '" << path << "'";
+ return state;
+ }
const Result<Task>& task = ::protobuf::read<Task>(path);
@@ -419,6 +467,12 @@ Try<TaskState> TaskState::recover(
// Read the status updates.
path = paths::getTaskUpdatesPath(
rootDir, slaveId, frameworkId, executorId, uuid, taskId);
+ if (!os::exists(path)) {
+ // This could happen if the slave died before it checkpointed
+ // any status updates for this task.
+ LOG(WARNING) << "Failed to find status updates file '" << path << "'";
+ return state;
+ }
// Open the status updates file for reading and writing (for truncating).
const Try<int>& fd = os::open(path, O_RDWR);
[18/18] git commit: Fixed status update manager to properly handle
duplicate updates.
Posted by vi...@apache.org.
Fixed status update manager to properly handle duplicate updates.
Review: https://reviews.apache.org/r/13520
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad2d6624
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad2d6624
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad2d6624
Branch: refs/heads/master
Commit: ad2d662497afbe91ad62adb2f0b270e32bcad655
Parents: 3bc167a
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 17:38:08 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 11:03:02 2013 -0700
----------------------------------------------------------------------
src/slave/status_update_manager.cpp | 20 ++++-
src/slave/status_update_manager.hpp | 41 ++++++----
src/tests/status_update_manager_tests.cpp | 106 +++++++++++++++++++++++++
3 files changed, 149 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 6d4598e..b6afeb1 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -242,7 +242,7 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
cleanupStatusUpdateStream(task.id, framework.id);
} else {
// If a stream has pending updates after the replay,
- // send the first pending update
+ // send the first pending update.
const Result<StatusUpdate>& next = stream->next();
CHECK(!next.isError());
if (next.isSome()) {
@@ -317,11 +317,17 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
}
// Handle the status update.
- Try<Nothing> result = stream->update(update);
+ Try<bool> result = stream->update(update);
if (result.isError()) {
return Future<Nothing>::failed(result.error());
}
+ // We don't return a failed future here so that the slave can re-ack
+ // the duplicate update.
+ if (!result.get()) {
+ return Nothing();
+ }
+
// Forward the status update to the master if this is the first in the stream.
// Subsequent status updates will get sent in 'acknowledgement()'.
if (stream->pending.size() == 1) {
@@ -403,6 +409,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
return Future<bool>::failed(result.error());
}
+ if (!result.get()) {
+ return Future<bool>::failed("Duplicate acknowledgement");
+ }
+
// Reset the timeout.
stream->timeout = None();
@@ -412,7 +422,9 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
return Future<bool>::failed(next.error());
}
- if (stream->terminated) {
+ bool terminated = stream->terminated;
+
+ if (terminated) {
if (next.isSome()) {
LOG(WARNING) << "Acknowledged a terminal"
<< " status update " << update.get()
@@ -424,7 +436,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
stream->timeout = forward(next.get());
}
- return result.get();
+ return !terminated;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index ffc79ae..a032646 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -103,9 +103,9 @@ public:
// Checkpoints the status update to disk if necessary.
// Also, sends the next pending status update, if any.
- // @return true if the ACK is handled successfully (e.g., checkpointed)
+ // @return True if the ACK is handled successfully (e.g., checkpointed)
// and the task's status update stream is not terminated.
- // false same as above except the status update stream is terminated.
+ // False same as above except the status update stream is terminated.
// Failed if there are any errors (e.g., duplicate, checkpointing).
process::Future<bool> acknowledgement(
const TaskID& taskId,
@@ -200,7 +200,11 @@ struct StatusUpdateStream
}
}
- Try<Nothing> update(const StatusUpdate& update)
+ // This function handles the update, checkpointing if necessary.
+ // @return True if the update is successfully handled.
+ // False if the update is a duplicate.
+ // Error Any errors (e.g., checkpointing).
+ Try<bool> update(const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
@@ -212,7 +216,7 @@ struct StatusUpdateStream
if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring status update " << update
<< " that has already been acknowledged by the framework!";
- return Nothing();
+ return false;
}
// Check that this update hasn't already been received.
@@ -220,13 +224,22 @@ struct StatusUpdateStream
// then crashes after it writes it to disk but before it sends an ack.
if (received.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring duplicate status update " << update;
- return Nothing();
+ return false;
}
// Handle the update, checkpointing if necessary.
- return handle(update, StatusUpdateRecord::UPDATE);
+ Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
}
+ // This function handles the ACK, checkpointing if necessary.
+ // @return True if the acknowledgement is successfully handled.
+ // False if the acknowledgement is a duplicate.
+ // Error Any errors (e.g., checkpointing).
Try<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
@@ -238,18 +251,18 @@ struct StatusUpdateStream
}
if (acknowledged.contains(uuid)) {
- return Error("Duplicate status update acknowledgment (UUID: "
- + uuid.toString() + ") for update " + stringify(update));
+ LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+ << uuid << ") for update " << update;
+ return false;
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (uuid != UUID::fromBytes(update.uuid())) {
- return Error(
- "Unexpected status update acknowledgement (received " +
- uuid.toString() +
- ", expecting " + UUID::fromBytes(update.uuid()).toString() +
- ") for update " + stringify(update));
+ LOG(WARNING) << "Unexpected status update acknowledgement (received "
+ << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+ << ") for update " << update;
+ return false;
}
// Handle the ACK, checkpointing if necessary.
@@ -258,7 +271,7 @@ struct StatusUpdateStream
return Error(result.error());
}
- return !terminated;
+ return true;
}
// Returns the next update (or none, if empty) in the queue.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 6473478..cf420e4 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -660,3 +660,109 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
Shutdown();
}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate status updates, when the second
+// update with the same UUID is received before the ACK for the
+// first update. The proper behavior here is for the status update
+// manager to drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+
+ Try<PID<Slave> > slave = StartSlave(&exec, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Capture the first status update message.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Drop the first ACK from the scheduler to the slave.
+ Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+ DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+ Clock::pause();
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(statusUpdateMessage);
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+ Future<Nothing> _statusUpdate =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+ // Now resend the TASK_RUNNING update.
+ process::post(slave.get(), statusUpdateMessage.get());
+
+ // At this point the status update manager has handled
+ // the duplicate status update.
+ AWAIT_READY(_statusUpdate);
+
+ // After we advance the clock, the status update manager should
+ // retry the TASK_RUNING update and the scheduler should receive
+ // and acknowledge it.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::settle();
+
+ // Ensure the scheduler receives TASK_FINISHED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
[16/18] git commit: Fixed slave to not re-register with Command
Executors.
Posted by vi...@apache.org.
Fixed slave to not re-register with Command Executors.
Review: https://reviews.apache.org/r/13487
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ca371ce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ca371ce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ca371ce
Branch: refs/heads/master
Commit: 7ca371ce4ca33f2a189037b57e091c6ad0a5aa48
Parents: 755308d
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 18:56:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 44 +++++++++++++++++++++-----------------------
src/slave/slave.hpp | 2 ++
2 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca371ce/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 11e4826..803da8d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -705,13 +705,18 @@ void Slave::doReliableRegistration()
continue;
}
- // TODO(bmahler): Kill this in 0.15.0, as in 0.14.0 we've
- // added code into the Scheduler Driver to ensure the
- // framework id is set in ExecutorInfo, effectively making
- // it a required field.
- ExecutorInfo* executorInfo = message.add_executor_infos();
- executorInfo->MergeFrom(executor->info);
- executorInfo->mutable_framework_id()->MergeFrom(framework->id);
+ // Do not re-register with Command Executors because the
+ // master doesn't store them; they are generated by the slave.
+ if (!executor->commandExecutor) {
+ ExecutorInfo* executorInfo = message.add_executor_infos();
+ executorInfo->MergeFrom(executor->info);
+
+ // TODO(bmahler): Kill this in 0.15.0, as in 0.14.0 we've
+ // added code into the Scheduler Driver to ensure the
+ // framework id is set in ExecutorInfo, effectively making
+ // it a required field.
+ executorInfo->mutable_framework_id()->MergeFrom(framework->id);
+ }
// Add launched tasks.
// TODO(vinod): Use foreachvalue instead once LinkedHashmap
@@ -2135,8 +2140,6 @@ void Slave::executorTerminated(
monitor.unwatch(frameworkId, executorId)
.onAny(lambda::bind(_unwatch, lambda::_1, frameworkId, executorId));
- Option<bool> isCommandExecutor;
-
// Transition all live tasks to TASK_LOST/TASK_FAILED.
// If the isolator destroyed the executor (e.g., due to OOM event)
// or if this is a command executor, we send TASK_FAILED status updates
@@ -2155,8 +2158,7 @@ void Slave::executorTerminated(
foreach (Task* task, executor->launchedTasks.values()) {
if (!protobuf::isTerminalState(task->state())) {
mesos::TaskState taskState;
- isCommandExecutor = !task->has_executor_id();
- if (destroyed || isCommandExecutor.get()) {
+ if (destroyed || executor->commandExecutor) {
taskState = TASK_FAILED;
} else {
taskState = TASK_LOST;
@@ -2176,8 +2178,7 @@ void Slave::executorTerminated(
// supports it.
foreach (const TaskInfo& task, executor->queuedTasks.values()) {
mesos::TaskState taskState;
- isCommandExecutor = task.has_command();
- if (destroyed || isCommandExecutor.get()) {
+ if (destroyed || executor->commandExecutor) {
taskState = TASK_FAILED;
} else {
taskState = TASK_LOST;
@@ -2192,16 +2193,10 @@ void Slave::executorTerminated(
}
}
- // If we weren't able to figure out whether this executor is a
- // command executor above (e.g., no pending tasks), we deduce
- // it from the ExecutorInfo. This is a hack for now.
- if (isCommandExecutor.isNone()) {
- isCommandExecutor = strings::contains(
- executor->info.command().value(),
- path::join(flags.launcher_dir, "mesos-executor"));
- }
-
- if (!isCommandExecutor.get()) {
+ // Only send ExitedExecutorMessage if it is not a Command
+ // Executor because the master doesn't store them; they are
+ // generated by the slave.
+ if (!executor->commandExecutor) {
ExitedExecutorMessage message;
message.mutable_slave_id()->MergeFrom(info.id());
message.mutable_framework_id()->MergeFrom(frameworkId);
@@ -2998,6 +2993,9 @@ Executor::Executor(
uuid(_uuid),
directory(_directory),
checkpoint(_checkpoint),
+ commandExecutor(strings::contains(
+ info.command().value(),
+ path::join(slave->flags.launcher_dir, "mesos-executor"))),
pid(UPID()),
resources(_info.resources()),
completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
http://git-wip-us.apache.org/repos/asf/mesos/blob/7ca371ce/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 93fd32e..ef8b64f 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -375,6 +375,8 @@ struct Executor
const bool checkpoint;
+ const bool commandExecutor;
+
UPID pid;
Resources resources; // Currently consumed resources.
[02/18] git commit: Added checkpointing ability for example
frameworks.
Posted by vi...@apache.org.
Added checkpointing ability for example frameworks.
Review: https://reviews.apache.org/r/13398
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab8d5d3c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab8d5d3c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab8d5d3c
Branch: refs/heads/master
Commit: ab8d5d3c4a9d8504e8e3579692cc92d21ada7a46
Parents: 853c9ba
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 18:05:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:27 2013 -0700
----------------------------------------------------------------------
src/examples/java/TestFramework.java | 14 +++++++++++---
src/examples/long_lived_framework.cpp | 7 +++++++
src/examples/python/test_framework.py | 6 ++++++
src/examples/test_framework.cpp | 7 +++++++
4 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/java/TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java
index d130b08..687ed0d 100644
--- a/src/examples/java/TestFramework.java
+++ b/src/examples/java/TestFramework.java
@@ -145,10 +145,18 @@ public class TestFramework {
.setSource("java_test")
.build();
- FrameworkInfo framework = FrameworkInfo.newBuilder()
+ FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
.setUser("") // Have Mesos fill in the current user.
- .setName("Test Framework (Java)")
- .build();
+ .setName("Test Framework (Java)");
+
+ // TODO(vinod): Make checkpointing the default when it is default
+ // on the slave.
+ if (System.getenv("MESOS_CHECKPOINT") != null) {
+ System.out.println("Enabling checkpoint for the framework");
+ frameworkBuilder.setCheckpoint(true);
+ }
+
+ FrameworkInfo framework = frameworkBuilder.build();
MesosSchedulerDriver driver = args.length == 1
? new MesosSchedulerDriver(
http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/long_lived_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/long_lived_framework.cpp b/src/examples/long_lived_framework.cpp
index 9c86481..08ccc75 100644
--- a/src/examples/long_lived_framework.cpp
+++ b/src/examples/long_lived_framework.cpp
@@ -178,6 +178,13 @@ int main(int argc, char** argv)
framework.set_user(""); // Have Mesos fill in the current user.
framework.set_name("Long Lived Framework (C++)");
+ // TODO(vinod): Make checkpointing the default when it is default
+ // on the slave.
+ if (getenv("MESOS_CHECKPOINT")) {
+ cout << "Enabling checkpoint for the framework" << endl;
+ framework.set_checkpoint(true);
+ }
+
MesosSchedulerDriver driver(&scheduler, framework, argv[1]);
return driver.run() == DRIVER_STOPPED ? 0 : 1;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/python/test_framework.py
----------------------------------------------------------------------
diff --git a/src/examples/python/test_framework.py b/src/examples/python/test_framework.py
index 39dcb05..eb20127 100755
--- a/src/examples/python/test_framework.py
+++ b/src/examples/python/test_framework.py
@@ -129,6 +129,12 @@ if __name__ == "__main__":
framework.user = "" # Have Mesos fill in the current user.
framework.name = "Test Framework (Python)"
+ # TODO(vinod): Make checkpointing the default when it is default
+ # on the slave.
+ if os.getenv("MESOS_CHECKPOINT"):
+ print "Enabling checkpoint for the framework";
+ framework.checkpoint = True
+
driver = mesos.MesosSchedulerDriver(
TestScheduler(executor),
framework,
http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8d5d3c/src/examples/test_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp
index f91d57a..0065de1 100644
--- a/src/examples/test_framework.cpp
+++ b/src/examples/test_framework.cpp
@@ -210,6 +210,13 @@ int main(int argc, char** argv)
framework.set_name("Test Framework (C++)");
framework.set_role(role);
+ // TODO(vinod): Make checkpointing the default when it is default
+ // on the slave.
+ if (getenv("MESOS_CHECKPOINT")) {
+ cout << "Enabling checkpoint for the framework" << endl;
+ framework.set_checkpoint(true);
+ }
+
MesosSchedulerDriver driver(&scheduler, framework, master.get());
return driver.run() == DRIVER_STOPPED ? 0 : 1;
[05/18] git commit: Fixed slave to offer total disk by default.
Posted by vi...@apache.org.
Fixed slave to offer total disk by default.
Review: https://reviews.apache.org/r/13395
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24991c9d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24991c9d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24991c9d
Branch: refs/heads/master
Commit: 24991c9d03b1b08ba25aa329026dbd7e7b40f38e
Parents: fbbb648
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:54:58 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/24991c9d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index aa0dbd7..5fa1fa7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -176,11 +176,11 @@ void Slave::initialize()
if (!resources.disk().isSome()) {
Bytes disk;
- // NOTE: We calculate disk availability of the file system on
+ // NOTE: We calculate disk size of the file system on
// which the slave work directory is mounted.
- Try<Bytes> disk_ = fs::available(flags.work_dir);
+ Try<Bytes> disk_ = fs::size(flags.work_dir);
if (!disk_.isSome()) {
- LOG(WARNING) << "Failed to auto-detect the free disk space: '"
+ LOG(WARNING) << "Failed to auto-detect the disk space: '"
<< disk_.error()
<< "' ; defaulting to " << DEFAULT_DISK;
disk = DEFAULT_DISK;
[03/18] git commit: Added fs::size() to calculate the total disk
space and killed fs::available().
Posted by vi...@apache.org.
Added fs::size() to calculate the total disk space
and killed fs::available().
Review: https://reviews.apache.org/r/13394
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fbbb648a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fbbb648a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fbbb648a
Branch: refs/heads/master
Commit: fbbb648a461c2b4f5ba8ff89328ec1c3053690f9
Parents: 9ce2edd
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:53:58 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fbbb648a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
index e405f96..3a20e86 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/fs.hpp
@@ -16,14 +16,14 @@
// a struct, and move this back into os.hpp.
namespace fs {
-// Returns the total available disk size in bytes.
-inline Try<Bytes> available(const std::string& path = "/")
+// Returns the total disk size in bytes.
+inline Try<Bytes> size(const std::string& path = "/")
{
struct statvfs buf;
if (::statvfs(path.c_str(), &buf) < 0) {
return ErrnoError();
}
- return Bytes(buf.f_bavail * buf.f_frsize);
+ return Bytes(buf.f_blocks * buf.f_frsize);
}
[11/18] git commit: Improved task validation error messages in master.
Posted by vi...@apache.org.
Improved task validation error messages in master.
Review: https://reviews.apache.org/r/13454
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/930aca13
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/930aca13
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/930aca13
Branch: refs/heads/master
Commit: 930aca1367afa0ad139300b1f8f8ee2e7d871f00
Parents: 376cd66
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 16:18:05 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:42:32 2013 -0700
----------------------------------------------------------------------
src/common/type_utils.hpp | 8 +++++++
src/master/master.cpp | 36 ++++++++++++++++++--------------
src/slave/slave.cpp | 3 +++
src/tests/resource_offers_tests.cpp | 10 +++++----
4 files changed, 37 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 9320ced..674a882 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -89,6 +89,14 @@ inline std::ostream& operator << (std::ostream& stream, const SlaveInfo& slave)
}
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const ExecutorInfo& executor)
+{
+ return stream << executor.DebugString();
+}
+
+
inline bool operator == (const FrameworkID& left, const FrameworkID& right)
{
return left.value() == right.value();
http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6530008..2b60e32 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1515,12 +1515,11 @@ struct ResourceUsageChecker : TaskInfoVisitor
Resources taskResources = task.resources();
if (!((usedResources + taskResources) <= offer->resources())) {
- LOG(WARNING) << "Task " << task.task_id() << " attempted to use "
- << taskResources << " combined with already used "
- << usedResources << " is greater than offered "
- << offer->resources();
-
- return TaskInfoError::some("Task uses more resources than offered");
+ return TaskInfoError::some(
+ "Task " + stringify(task.task_id()) + " attempted to use " +
+ stringify(taskResources) + " combined with already used " +
+ stringify(usedResources) + " is greater than offered " +
+ stringify(offer->resources()));
}
// Check this task's executor's resources.
@@ -1530,9 +1529,9 @@ struct ResourceUsageChecker : TaskInfoVisitor
foreach (const Resource& resource, task.executor().resources()) {
if (!Resources::isAllocatable(resource)) {
// TODO(benh): Send back the invalid resources?
- LOG(WARNING) << "Executor for task " << task.task_id()
- << " uses invalid resources " << resource;
- return TaskInfoError::some("Task's executor uses invalid resources");
+ return TaskInfoError::some(
+ "Executor for task " + stringify(task.task_id()) +
+ " uses invalid resources " + stringify(resource));
}
}
@@ -1542,13 +1541,11 @@ struct ResourceUsageChecker : TaskInfoVisitor
if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
taskResources += task.executor().resources();
if (!((usedResources + taskResources) <= offer->resources())) {
- LOG(WARNING) << "Task " << task.task_id() << " + executor attempted"
- << " to use " << taskResources << " combined with"
- << " already used " << usedResources << " is greater"
- << " than offered " << offer->resources();
-
return TaskInfoError::some(
- "Task + executor uses more resources than offered");
+ "Task " + stringify(task.task_id()) + " + executor attempted" +
+ " to use " + stringify(taskResources) + " combined with" +
+ " already used " + stringify(usedResources) + " is greater" +
+ " than offered " + stringify(offer->resources()));
}
}
executors.insert(task.executor().executor_id());
@@ -1588,7 +1585,14 @@ struct ExecutorInfoChecker : TaskInfoVisitor
if (!(task.executor() == executorInfo)) {
return TaskInfoError::some(
"Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible)");
+ " with same ExecutorID is not compatible).\n"
+ "------------------------------------------------------------\n"
+ "Existing ExecutorInfo:\n" +
+ stringify(executorInfo) + "\n"
+ "------------------------------------------------------------\n"
+ "Task's ExecutorInfo:\n" +
+ stringify(task.executor()) + "\n"
+ "------------------------------------------------------------\n");
}
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e8176d2..83c250a 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2593,8 +2593,11 @@ Future<Nothing> Slave::recover(bool reconnect, bool strict)
if (reconnect && !(info == state.get().info.get())) {
EXIT(1)
<< "Incompatible slave info detected.\n"
+ << "------------------------------------------------------------\n"
<< "Old slave info:\n" << state.get().info.get() << "\n"
+ << "------------------------------------------------------------\n"
<< "New slave info:\n" << info << "\n"
+ << "------------------------------------------------------------\n"
<< "To properly upgrade the slave do as follows:\n"
<< "Step 1: Start the slave with --recover=cleanup.\n"
<< "Step 2: Wait till the slave kills all executors and shuts down.\n"
http://git-wip-us.apache.org/repos/asf/mesos/blob/930aca13/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index a96e775..3888e46 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -23,6 +23,8 @@
#include <mesos/executor.hpp>
#include <mesos/scheduler.hpp>
+#include <stout/strings.hpp>
+
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
@@ -252,7 +254,8 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered)
EXPECT_EQ(task.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
- EXPECT_EQ("Task uses more resources than offered", status.get().message());
+ EXPECT_TRUE(strings::contains(
+ status.get().message(), "greater than offered"));
driver.stop();
driver.join();
@@ -570,9 +573,8 @@ TEST_F(MultipleExecutorsTest, TasksExecutorInfoDiffers)
EXPECT_EQ(task2.task_id(), status.get().task_id());
EXPECT_EQ(TASK_LOST, status.get().state());
EXPECT_TRUE(status.get().has_message());
- EXPECT_EQ("Task has invalid ExecutorInfo (existing ExecutorInfo"
- " with same ExecutorID is not compatible)",
- status.get().message());
+ EXPECT_TRUE(strings::contains(
+ status.get().message(), "Task has invalid ExecutorInfo"));
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));
[12/18] git commit: Implemented a LinkedHashMap that preserves the
insertion order of keys of a hashmap.
Posted by vi...@apache.org.
Implemented a LinkedHashMap that preserves the insertion
order of keys of a hashmap.
Review: https://reviews.apache.org/r/13464
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a2ebf17f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a2ebf17f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a2ebf17f
Branch: refs/heads/master
Commit: a2ebf17f030697696f109db4391a6d9dc9225232
Parents: 930aca1
Author: Vinod Kone <vi...@twitter.com>
Authored: Sat Aug 10 01:13:55 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:49:29 2013 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/Makefile.am | 1 +
3rdparty/libprocess/3rdparty/stout/Makefile.am | 2 +
.../3rdparty/stout/include/stout/hashmap.hpp | 2 +
.../stout/include/stout/linkedhashmap.hpp | 92 +++++++++++++++++++
.../stout/tests/linkedhashmap_tests.cpp | 93 ++++++++++++++++++++
5 files changed, 190 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/Makefile.am b/3rdparty/libprocess/3rdparty/Makefile.am
index 0cd407c..e8561df 100644
--- a/3rdparty/libprocess/3rdparty/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/Makefile.am
@@ -123,6 +123,7 @@ stout_tests_SOURCES = \
$(STOUT)/tests/gzip_tests.cpp \
$(STOUT)/tests/hashset_tests.cpp \
$(STOUT)/tests/json_tests.cpp \
+ $(STOUT)/tests/linkedhashmap_tests.cpp \
$(STOUT)/tests/main.cpp \
$(STOUT)/tests/multimap_tests.cpp \
$(STOUT)/tests/none_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/Makefile.am b/3rdparty/libprocess/3rdparty/stout/Makefile.am
index e465fd1..0428aa8 100644
--- a/3rdparty/libprocess/3rdparty/stout/Makefile.am
+++ b/3rdparty/libprocess/3rdparty/stout/Makefile.am
@@ -26,6 +26,7 @@ EXTRA_DIST = \
include/stout/hashset.hpp \
include/stout/json.hpp \
include/stout/lambda.hpp \
+ include/stout/linkedhashmap.hpp \
include/stout/multihashmap.hpp \
include/stout/multimap.hpp \
include/stout/net.hpp \
@@ -66,6 +67,7 @@ EXTRA_DIST = \
tests/gzip_tests.cpp \
tests/hashset_tests.cpp \
tests/json_tests.cpp \
+ tests/linkedhashmap_tests.cpp \
tests/main.cpp \
tests/multimap_tests.cpp \
tests/none_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
index 796cb50..cea6988 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/hashmap.hpp
@@ -53,6 +53,7 @@ public:
}
// Returns the set of keys in this map.
+ // TODO(vinod/bmahler): Should return a list instead.
hashset<Key> keys() const
{
hashset<Key> result;
@@ -63,6 +64,7 @@ public:
}
// Returns the set of values in this map.
+ // TODO(vinod/bmahler): Should return a list instead.
hashset<Value> values() const
{
hashset<Value> result;
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
new file mode 100644
index 0000000..a27ec26
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -0,0 +1,92 @@
+#ifndef __STOUT_LINKEDHASHMAP_HPP__
+#define __STOUT_LINKEDHASHMAP_HPP__
+
+#include <list>
+#include <utility>
+
+#include <stout/hashmap.hpp>
+#include <stout/option.hpp>
+
+// Implementation of a hashmap that maintains the insertion order
+// of the keys. Note that re-insertion of a key (i.e., update)
+// doesn't update its insertion order.
+// TODO(vinod/bmahler): Consider extending from stout::hashmap and/or
+// having a compatible API with stout::hashmap.
+template <typename Key, typename Value>
+class LinkedHashMap
+{
+public:
+ typedef std::list<Key> list;
+ typedef hashmap<Key, std::pair<Value, typename list::iterator> > map;
+
+ Value& operator[] (const Key& key)
+ {
+ if (!values_.contains(key)) {
+ // Insert into the list and get the "pointer" into the list.
+ typename list::iterator i = keys_.insert(keys_.end(), key);
+ values_[key] = std::make_pair(Value(), i); // Store default value.
+ }
+ return values_[key].first;
+ }
+
+ Option<Value> get(const Key& key) const
+ {
+ if (values_.contains(key)) {
+ return values_.at(key).first;
+ }
+ return None();
+ }
+
+ bool contains(const Key& key) const
+ {
+ return values_.contains(key);
+ }
+
+ size_t erase(const Key& key)
+ {
+ if (values_.contains(key)) {
+ // Get the "pointer" into the list.
+ typename list::iterator i = values_[key].second;
+ keys_.erase(i);
+ return values_.erase(key);
+ }
+ return 0;
+ }
+
+ std::list<Key> keys() const
+ {
+ return keys_;
+ }
+
+ std::list<Value> values() const
+ {
+ std::list<Value> result;
+ foreach (const Key& key, keys_) {
+ result.push_back(values_.at(key).first);
+ }
+ return result;
+ }
+
+ size_t size() const
+ {
+ return keys_.size();
+ }
+
+ bool empty() const
+ {
+ return keys_.empty();
+ }
+
+ void clear()
+ {
+ values_.clear();
+ keys_.clear();
+ }
+
+private:
+ list keys_; // Keys ordered by the insertion order.
+ map values_; // Map of values and "pointers" to the linked list.
+};
+
+
+#endif // __STOUT_LINKEDHASHMAP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/a2ebf17f/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
new file mode 100644
index 0000000..aca97ca
--- /dev/null
+++ b/3rdparty/libprocess/3rdparty/stout/tests/linkedhashmap_tests.cpp
@@ -0,0 +1,93 @@
+#include <stdint.h>
+
+#include <gtest/gtest.h>
+
+#include <list>
+#include <string>
+
+#include <stout/gtest.hpp>
+#include <stout/linkedhashmap.hpp>
+
+using std::list;
+using std::string;
+
+TEST(LinkedHashmapTest, Put)
+{
+ LinkedHashMap<string, int> map;
+
+ map["foo"] = 1;
+ ASSERT_SOME_EQ(1, map.get("foo"));
+ ASSERT_EQ(1, map.size());
+
+ map["bar"] = 2;
+ ASSERT_SOME_EQ(2, map.get("bar"));
+ ASSERT_EQ(2, map.size());
+
+ map["foo"] = 3;
+ ASSERT_SOME_EQ(3, map.get("foo"));
+ ASSERT_EQ(2, map.size());
+}
+
+
+TEST(LinkedHashmapTest, Contains)
+{
+ LinkedHashMap<string, int> map;
+ map["foo"] = 1;
+ map["bar"] = 2;
+ ASSERT_TRUE(map.contains("foo"));
+ ASSERT_TRUE(map.contains("bar"));
+ ASSERT_FALSE(map.contains("caz"));
+}
+
+
+TEST(LinkedHashmapTest, Erase)
+{
+ LinkedHashMap<string, int> map;
+
+ map["foo"] = 1;
+ map["bar"] = 2;
+ ASSERT_EQ(2, map.size());
+
+ ASSERT_EQ(1, map.erase("foo"));
+ ASSERT_EQ(0, map.erase("caz")); // Non-existent key.
+ ASSERT_NONE(map.get("foo"));
+ ASSERT_EQ(1, map.size());
+ ASSERT_SOME_EQ(2, map.get("bar"));
+}
+
+
+TEST(LinkedHashmapTest, Keys)
+{
+ LinkedHashMap<string, int> map;
+
+ std::list<string> keys;
+ keys.push_back("foo");
+ keys.push_back("bar");
+ keys.push_back("food");
+ keys.push_back("rad");
+ keys.push_back("cat");
+
+ // Insert keys into the map.
+ foreach (const string& key, keys) {
+ map[key] = 1;
+ }
+ map["foo"] = 1; // Re-insert a key.
+
+ // Ensure the keys returned are the same as insertion order.
+ ASSERT_EQ(keys, map.keys());
+}
+
+
+TEST(LinkedHashmapTest, Values)
+{
+ LinkedHashMap<string, int> map;
+
+ map["foo"] = 1;
+ map["bar"] = 2;
+ map["caz"] = 3;
+
+ int val = 0;
+ foreach (int value, map.values()) {
+ ASSERT_EQ(++val, value);
+ }
+}
[15/18] git commit: Fixed slave to always call
Isolator::resourcesChanged() when an executor re-registers.
Posted by vi...@apache.org.
Fixed slave to always call Isolator::resourcesChanged() when an
executor re-registers.
Review: https://reviews.apache.org/r/13503
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e79eef3f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e79eef3f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e79eef3f
Branch: refs/heads/master
Commit: e79eef3fe8e961f5236aaeec1998006d0e200ed6
Parents: 7ca371c
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 12:07:37 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e79eef3f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 803da8d..cf9f292 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1648,6 +1648,13 @@ void Slave::reregisterExecutor(
statusUpdate(update); // This also updates the executor's resources!
}
+ // Tell the isolator to update the resources.
+ dispatch(isolator,
+ &Isolator::resourcesChanged,
+ frameworkId,
+ executorId,
+ executor->resources);
+
// Now, if there is any task still in STAGING state and not in
// 'tasks' known to the executor, the slave must have died
// before the executor received the task! Relaunch it!
[14/18] git commit: Fixed executor driver to store tasks and updates
in LinkedHashMap.
Posted by vi...@apache.org.
Fixed executor driver to store tasks and updates in LinkedHashMap.
Review: https://reviews.apache.org/r/13486
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/755308dc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/755308dc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/755308dc
Branch: refs/heads/master
Commit: 755308dca1976b2df08ef264108fff0e7fdc6aa9
Parents: 010fa31
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 17:58:50 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:40 2013 -0700
----------------------------------------------------------------------
src/exec/exec.cpp | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/755308dc/src/exec/exec.cpp
----------------------------------------------------------------------
diff --git a/src/exec/exec.cpp b/src/exec/exec.cpp
index d467724..1a0dd07 100644
--- a/src/exec/exec.cpp
+++ b/src/exec/exec.cpp
@@ -33,7 +33,7 @@
#include <process/protobuf.hpp>
#include <stout/duration.hpp>
-#include <stout/hashmap.hpp>
+#include <stout/linkedhashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/fatal.hpp>
#include <stout/numify.hpp>
@@ -246,12 +246,16 @@ protected:
message.mutable_framework_id()->MergeFrom(frameworkId);
// Send all unacknowledged updates.
- foreachvalue (const StatusUpdate& update, updates) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const StatusUpdate& update, updates.values()) {
message.add_updates()->MergeFrom(update);
}
// Send all unacknowledged tasks.
- foreachvalue (const TaskInfo& task, tasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, tasks.values()) {
message.add_tasks()->MergeFrom(task);
}
@@ -494,13 +498,13 @@ private:
const string directory;
bool checkpoint;
- hashmap<UUID, StatusUpdate> updates; // Unacknowledged updates.
+ LinkedHashMap<UUID, StatusUpdate> updates; // Unacknowledged updates.
// We store tasks that have not been acknowledged
// (via status updates) by the slave. This ensures that, during
// recovery, the slave relaunches only those tasks that have
// never reached this executor.
- hashmap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+ LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
};
} // namespace internal {
[06/18] git commit: Improved the allocator to not offer
non-checkpointing slave resources to checkpointing frameworks.
Posted by vi...@apache.org.
Improved the allocator to not offer non-checkpointing slave resources
to checkpointing frameworks.
Review: https://reviews.apache.org/r/13407
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ea6c766b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ea6c766b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ea6c766b
Branch: refs/heads/master
Commit: ea6c766b57b1b59e218e4fc47befede762e9231a
Parents: 6da3267
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 22:46:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700
----------------------------------------------------------------------
src/master/hierarchical_allocator_process.hpp | 25 ++++++++++++++++------
src/master/master.cpp | 15 ++++---------
src/tests/slave_recovery_tests.cpp | 17 ++++++++-------
3 files changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/master/hierarchical_allocator_process.hpp
----------------------------------------------------------------------
diff --git a/src/master/hierarchical_allocator_process.hpp b/src/master/hierarchical_allocator_process.hpp
index 76465eb..183b205 100644
--- a/src/master/hierarchical_allocator_process.hpp
+++ b/src/master/hierarchical_allocator_process.hpp
@@ -61,6 +61,7 @@ struct Slave
Slave(const SlaveInfo& _info)
: available(_info.resources()),
whitelisted(false),
+ checkpoint(_info.checkpoint()),
info(_info) {}
Resources resources() const { return info.resources(); }
@@ -74,6 +75,7 @@ struct Slave
// frameworks.
bool whitelisted;
+ bool checkpoint;
private:
SlaveInfo info;
};
@@ -84,13 +86,15 @@ struct Framework
Framework() {}
Framework(const FrameworkInfo& _info)
- : info(_info) {}
+ : checkpoint(_info.checkpoint()),
+ info(_info) {}
std::string role() const { return info.role(); }
// Filters that have been added by this framework.
hashset<Filter*> filters;
+ bool checkpoint;
private:
FrameworkInfo info;
};
@@ -773,19 +777,28 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
const SlaveID& slaveId,
const Resources& resources)
{
- bool filtered = false;
-
CHECK(frameworks.contains(frameworkId));
+ CHECK(slaves.contains(slaveId));
+
+ // Do not offer a non-checkpointing slave's resources to a checkpointing
+ // framework. This is a short term fix until the following is resolved:
+ // https://issues.apache.org/jira/browse/MESOS-444.
+ if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
+ VLOG(1) << "Filtered " << resources
+ << " on non-checkpointing slave " << slaveId
+ << " for checkpointing framework " << frameworkId;
+ return true;
+ }
+
foreach (Filter* filter, frameworks[frameworkId].filters) {
if (filter->filter(slaveId, resources)) {
VLOG(1) << "Filtered " << resources
<< " on slave " << slaveId
<< " for framework " << frameworkId;
- filtered = true;
- break;
+ return true;
}
}
- return filtered;
+ return false;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b6d12a3..0675b52 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1349,17 +1349,10 @@ void Master::offer(const FrameworkID& frameworkId,
Slave* slave = slaves[slaveId];
- // Do not offer a non-checkpointing slave's resources to a checkpointing
- // framework. This is a short term fix until the following is resolved:
- // https://issues.apache.org/jira/browse/MESOS-444.
- if (framework->info.checkpoint() && !slave->info.checkpoint()) {
- LOG(WARNING) << "Master returning resources offered to checkpointing "
- << "framework " << frameworkId << " because slave "
- << slaveId << " is not checkpointing";
-
- allocator->resourcesRecovered(frameworkId, slaveId, offered);
- continue;
- }
+ CHECK(slave->info.checkpoint() || !framework->info.checkpoint())
+ << "Resources of non checkpointing slave " << slaveId
+ << " (" << slave->info.hostname() << ") are being offered to"
+ << " checkpointing framework " << frameworkId;
// This could happen if the allocator dispatched 'Master::offer' before
// it received 'Allocator::slaveRemoved' from the master.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ea6c766b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index a0734c3..6de8108 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1034,9 +1034,16 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
slave::Flags flags = this->CreateSlaveFlags();
flags.checkpoint = false;
+ Clock::pause();
+
+ Future<RegisterSlaveMessage> registerSlaveMessage =
+ FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
Try<PID<Slave> > slave = this->StartSlave(&isolator, flags);
ASSERT_SOME(slave);
+ AWAIT_READY(registerSlaveMessage);
+
MockScheduler sched;
// Enable checkpointing for the framework.
@@ -1053,17 +1060,11 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpointingSlave)
EXPECT_CALL(sched, resourceOffers(_, _))
.Times(0); // No offers should be received!
- Future<Nothing> offer = FUTURE_DISPATCH(_, &Master::offer);
-
- Clock::pause();
-
driver.start();
- AWAIT_READY(registered);
-
- // Wait for an offer to be made. We do a Clock::settle() here
+ // Wait for scheduler to register. We do a Clock::settle() here
// to ensure that no offers are received by the scheduler.
- AWAIT_READY(offer);
+ AWAIT_READY(registered);
Clock::settle();
driver.stop();
[04/18] git commit: Fixed slave to always delete the latest slave
symlink on shutdown or cleanup.
Posted by vi...@apache.org.
Fixed slave to always delete the latest slave symlink on
shutdown or cleanup.
Review: https://reviews.apache.org/r/13392
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ce2edda
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ce2edda
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ce2edda
Branch: refs/heads/master
Commit: 9ce2edda4f1daf2b9e68bcc2ffe2e1a0c072ed73
Parents: ea6c766
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Aug 7 16:29:26 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:09:28 2013 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ce2edda/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b835ac7..aa0dbd7 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -474,12 +474,13 @@ void Slave::finalize()
}
}
- if (flags.checkpoint &&
- (state == TERMINATING || flags.recover == "cleanup")) {
+ if (state == TERMINATING || flags.recover == "cleanup") {
// We remove the "latest" symlink in meta directory, so that the
// slave doesn't recover the state when it restarts and registers
// as a new slave with the master.
- CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+ if (os::exists(paths::getLatestSlavePath(metaDir))) {
+ CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+ }
}
// Stop the isolator.
@@ -1143,8 +1144,8 @@ void Slave::shutdownFramework(const FrameworkID& frameworkId)
// its a message from the currently registered master.
if (from && from != master) {
LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
- << " from " << from << "because it is not from the registered "
- << "master (" << master << ")";
+ << " from " << from << " because it is not from the registered"
+ << " master (" << master << ")";
return;
}
[13/18] git commit: Fixed slave to use LinkedHashMap to store tasks.
Posted by vi...@apache.org.
Fixed slave to use LinkedHashMap to store tasks.
Review: https://reviews.apache.org/r/13485
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/010fa314
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/010fa314
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/010fa314
Branch: refs/heads/master
Commit: 010fa31495cbede6528f2a576497e9b890fcebfb
Parents: a2ebf17
Author: Vinod Kone <vi...@twitter.com>
Authored: Sun Aug 11 17:09:17 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:56:36 2013 -0700
----------------------------------------------------------------------
src/slave/http.cpp | 18 +++++++++++++-----
src/slave/slave.cpp | 40 ++++++++++++++++++++++++++++++----------
src/slave/slave.hpp | 7 ++++---
3 files changed, 47 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index c45dfb4..073d092 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -204,22 +204,30 @@ JSON::Object model(const Executor& executor)
object.values["resources"] = model(executor.resources);
JSON::Array tasks;
- foreachvalue (Task* task, executor.launchedTasks) {
+ foreach (Task* task, executor.launchedTasks.values()) {
tasks.values.push_back(model(*task));
}
object.values["tasks"] = tasks;
JSON::Array queued;
- foreachvalue (const TaskInfo& task, executor.queuedTasks) {
+ foreach (const TaskInfo& task, executor.queuedTasks.values()) {
queued.values.push_back(model(task));
}
object.values["queued_tasks"] = queued;
- JSON::Array completedTasks;
+ JSON::Array completed;
foreach (const Task& task, executor.completedTasks) {
- completedTasks.values.push_back(model(task));
+ completed.values.push_back(model(task));
}
- object.values["completed_tasks"] = completedTasks;
+
+ // NOTE: We add 'terminatedTasks' to 'completed_tasks' for
+ // simplicity.
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor.terminatedTasks.values()) {
+ completed.values.push_back(model(*task));
+ }
+ object.values["completed_tasks"] = completed;
return object;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 83c250a..11e4826 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -714,12 +714,16 @@ void Slave::doReliableRegistration()
executorInfo->mutable_framework_id()->MergeFrom(framework->id);
// Add launched tasks.
- foreachvalue (Task* task, executor->launchedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->launchedTasks.values()) {
message.add_tasks()->CopyFrom(*task);
}
// Add queued tasks.
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
const Task& t = protobuf::createTask(
task, TASK_STAGING, executor->id, framework->id);
@@ -727,7 +731,9 @@ void Slave::doReliableRegistration()
}
// Add terminated tasks.
- foreachvalue (Task* task, executor->terminatedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->terminatedTasks.values()) {
message.add_tasks()->CopyFrom(*task);
}
}
@@ -1509,7 +1515,9 @@ void Slave::registerExecutor(
}
// First account for the tasks we're about to start.
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
// Add the task to the executor.
executor->addTask(task);
}
@@ -1534,7 +1542,9 @@ void Slave::registerExecutor(
message.mutable_slave_info()->MergeFrom(info);
send(executor->pid, message);
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
LOG(INFO) << "Flushing queued task " << task.task_id()
<< " for executor '" << executor->id << "'"
<< " of framework " << framework->id;
@@ -1641,7 +1651,9 @@ void Slave::reregisterExecutor(
launched[task.task_id()] = task;
}
- foreachvalue (Task* task, executor->launchedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->launchedTasks.values()) {
if (task->state() == TASK_STAGING &&
!launched.contains(task->task_id())) {
@@ -2138,7 +2150,9 @@ void Slave::executorTerminated(
StatusUpdate update;
// Transition all live launched tasks.
- foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, executor->launchedTasks.values()) {
if (!protobuf::isTerminalState(task->state())) {
mesos::TaskState taskState;
isCommandExecutor = !task->has_executor_id();
@@ -2158,8 +2172,9 @@ void Slave::executorTerminated(
}
// Transition all queued tasks.
- foreachvalue (const TaskInfo& task,
- utils::copy(executor->queuedTasks)) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (const TaskInfo& task, executor->queuedTasks.values()) {
mesos::TaskState taskState;
isCommandExecutor = task.has_command();
if (destroyed || isCommandExecutor.get()) {
@@ -3006,7 +3021,12 @@ Executor::Executor(
Executor::~Executor()
{
// Delete the tasks.
- foreachvalue (Task* task, launchedTasks) {
+ // TODO(vinod): Use foreachvalue instead once LinkedHashmap
+ // supports it.
+ foreach (Task* task, launchedTasks.values()) {
+ delete task;
+ }
+ foreach (Task* task, terminatedTasks.values()) {
delete task;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/010fa314/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 464d224..93fd32e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -34,6 +34,7 @@
#include <process/protobuf.hpp>
#include <stout/bytes.hpp>
+#include <stout/linkedhashmap.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/multihashmap.hpp>
@@ -378,9 +379,9 @@ struct Executor
Resources resources; // Currently consumed resources.
- hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
- hashmap<TaskID, Task*> launchedTasks; // Running.
- hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+ LinkedHashMap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+ LinkedHashMap<TaskID, Task*> launchedTasks; // Running.
+ LinkedHashMap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
private:
[17/18] git commit: Fixed master to properly reconcile completed
frameworks when slave re-registers.
Posted by vi...@apache.org.
Fixed master to properly reconcile completed frameworks when
slave re-registers.
Review: https://reviews.apache.org/r/13508
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3bc167a8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3bc167a8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3bc167a8
Branch: refs/heads/master
Commit: 3bc167a8137a13287104ff884e1b940fbf95ae4f
Parents: e79eef3
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 15:01:22 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 10:57:42 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 26 ++++++++-
src/tests/slave_recovery_tests.cpp | 93 +++++++++++++++++++++++++++++++++
2 files changed, 118 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bc167a8/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 2b60e32..d53b8bb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -54,7 +54,7 @@ using process::wait; // Necessary on some OS's to disambiguate.
using std::tr1::cref;
using std::tr1::bind;
-
+using std::tr1::shared_ptr;
namespace mesos {
namespace internal {
@@ -1775,6 +1775,9 @@ Resources Master::launchTask(const TaskInfo& task,
}
+// NOTE: This function is only called when the slave re-registers
+// with a master that already knows about it (i.e., not a failed
+// over master).
void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
{
CHECK_NOTNULL(slave);
@@ -1824,6 +1827,27 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
send(slave->pid, message);
}
}
+
+ // Send ShutdownFrameworkMessages for frameworks that are completed.
+ // This could happen if the message wasn't received by the slave
+ // (e.g., slave was down, partitioned).
+ // NOTE: This is a short-term hack because this information is lost
+ // when the master fails over. Also, 'completedFrameworks' has a
+ // limited capacity.
+ // TODO(vinod): Revisit this when registrar is in place. It would
+ // likely involve storing this information in the registrar.
+ foreach (const shared_ptr<Framework>& framework, completedFrameworks) {
+ if (slaveTasks.contains(framework->id)) {
+ LOG(WARNING)
+ << "Slave " << slave->id << " (" << slave->info.hostname()
+ << ") re-registered with completed framework " << framework->id
+ << ". Shutting down the framework on the slave";
+
+ ShutdownFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ send(slave->pid, message);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3bc167a8/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index ef6dbf7..28a2628 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1601,3 +1601,96 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
this->Shutdown(); // Shutdown before isolator(s) get deallocated.
}
+
+
+// This test verifies that when the slave recovers and re-registers
+// with a framework that was shutdown when the slave was down, it gets
+// a ShutdownFramework message.
+TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ Future<RegisterSlaveMessage> registerSlaveMessage =
+ FUTURE_PROTOBUF(RegisterSlaveMessage(), _, _);
+
+ TypeParam isolator1;
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Try<PID<Slave> > slave = this->StartSlave(&isolator1, flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(registerSlaveMessage);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Capture the slave and framework ids.
+ SlaveID slaveId = offers.get()[0].slave_id();
+ FrameworkID frameworkId = offers.get()[0].framework_id();
+
+ EXPECT_CALL(sched, statusUpdate(_, _)); // TASK_RUNNING
+
+ Future<Nothing> _statusUpdateAcknowledgement =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ vector<TaskInfo> tasks;
+ tasks.push_back(task); // Long-running task
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Wait for TASK_RUNNING update to be acknowledged.
+ AWAIT_READY(_statusUpdateAcknowledgement);
+
+ this->Stop(slave.get());
+
+ Future<UnregisterFrameworkMessage> unregisterFrameworkMessage =
+ FUTURE_PROTOBUF(UnregisterFrameworkMessage(), _, _);
+
+ // Now stop the framework.
+ driver.stop();
+ driver.join();
+
+ // Wait util the framework is removed.
+ AWAIT_READY(unregisterFrameworkMessage);
+
+ Future<ShutdownFrameworkMessage> shutdownFrameworkMessage =
+ FUTURE_PROTOBUF(ShutdownFrameworkMessage(), _, _);
+
+ Future<Nothing> executorTerminated =
+ FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+ // Now restart the slave (use same flags) with a new isolator.
+ TypeParam isolator2;
+
+ slave = this->StartSlave(&isolator2, flags);
+ ASSERT_SOME(slave);
+
+ // Slave should get a ShutdownFrameworkMessage.
+ AWAIT_READY(shutdownFrameworkMessage);
+
+ // Ensure that the executor is terminated.
+ AWAIT_READY(executorTerminated);
+
+ this->Shutdown(); // Shutdown before isolator(s) get deallocated.
+}
[10/18] git commit: Fixed cgroups isolator to do setsid on the
executor process.
Posted by vi...@apache.org.
Fixed cgroups isolator to do setsid on the executor process.
Review: https://reviews.apache.org/r/13449
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/376cd66c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/376cd66c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/376cd66c
Branch: refs/heads/master
Commit: 376cd66c4a6b71d5444acf615db1fc098747da75
Parents: 649295f
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 15:00:16 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:36:00 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 51 ++++++++++++++++++++++++++++++++-
src/slave/process_isolator.cpp | 2 --
src/tests/slave_recovery_tests.cpp | 6 ++++
3 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 3427c62..d4ccd11 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -532,6 +532,19 @@ void CgroupsIsolator::launchExecutor(
// Start listening on OOM events.
oomListen(frameworkId, executorId);
+ // Use pipes to determine which child has successfully changed session.
+ int pipes[2];
+ if (pipe(pipes) < 0) {
+ PLOG(FATAL) << "Failed to create a pipe";
+ }
+
+ // Set the FD_CLOEXEC flags on these pipes
+ Try<Nothing> cloexec = os::cloexec(pipes[0]);
+ CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[0]";
+
+ cloexec = os::cloexec(pipes[1]);
+ CHECK_SOME(cloexec) << "Error setting FD_CLOEXEC on pipe[1]";
+
// Launch the executor using fork-exec.
pid_t pid;
if ((pid = ::fork()) == -1) {
@@ -539,6 +552,15 @@ void CgroupsIsolator::launchExecutor(
}
if (pid > 0) {
+ os::close(pipes[1]);
+
+ // Get the child's pid via the pipe.
+ if (read(pipes[0], &pid, sizeof(pid)) == -1) {
+ PLOG(FATAL) << "Failed to get child PID from pipe";
+ }
+
+ os::close(pipes[0]);
+
// In parent process.
LOG(INFO) << "Forked executor at = " << pid;
@@ -558,7 +580,34 @@ void CgroupsIsolator::launchExecutor(
executorId,
pid);
} else {
- // In child process.
+ // In child process, we make cleanup easier by putting process
+ // into it's own session. DO NOT USE GLOG!
+ os::close(pipes[0]);
+
+ // NOTE: We setsid() in a loop because setsid() might fail if another
+ // process has the same process group id as the calling process.
+ while ((pid = setsid()) == -1) {
+ perror("Could not put executor in its own session");
+
+ std::cout << "Forking another process and retrying ..." << std::endl;
+
+ if ((pid = fork()) == -1) {
+ perror("Failed to fork to launch executor");
+ abort();
+ }
+
+ if (pid > 0) {
+ // In parent process.
+ exit(0);
+ }
+ }
+
+ if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
+ perror("Failed to write PID on pipe");
+ abort();
+ }
+
+ os::close(pipes[1]);
launcher::ExecutorLauncher launcher(
slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a80b047..24a7fb2 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -191,8 +191,6 @@ void ProcessIsolator::launchExecutor(
if (pid > 0) {
// In parent process.
- // It is ok to suicide here, though process reaper signals the exit,
- // because the process isolator ignores unknown processes.
exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/376cd66c/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 6de8108..ef6dbf7 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -734,6 +734,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+ EXPECT_CALL(sched, offerRescinded(_, _))
+ .Times(AtMost(1));
+
Future<Nothing> schedule = FUTURE_DISPATCH(
_, &GarbageCollectorProcess::schedule);
@@ -1366,6 +1369,9 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+ EXPECT_CALL(sched, offerRescinded(_, _))
+ .Times(AtMost(1));
+
Future<Nothing> executorTerminated =
FUTURE_DISPATCH(_, &Slave::executorTerminated);
[08/18] git commit: Fixed slave to not recover terminated executors.
Posted by vi...@apache.org.
Fixed slave to not recover terminated executors.
Review: https://reviews.apache.org/r/13450
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/649295f3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/649295f3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/649295f3
Branch: refs/heads/master
Commit: 649295f34d6b7a70314a52cd6db3f74208941b98
Parents: fd3584a
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 14:27:02 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700
----------------------------------------------------------------------
src/slave/monitor.cpp | 2 +-
src/slave/slave.cpp | 13 +++++++++++--
src/slave/status_update_manager.cpp | 13 ++++++-------
src/slave/status_update_manager.hpp | 2 +-
4 files changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 4f3c91f..8e1eb35 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -183,7 +183,7 @@ void ResourceMonitorProcess::_collect(
} else {
// Note that the isolator might have been terminated and pending
// dispatches deleted, causing the future to get discarded.
- LOG(WARNING)
+ VLOG(1)
<< "Failed to collect resource usage for executor '" << executorId
<< "' of framework '" << frameworkId << "': "
<< (statistics.isFailed() ? statistics.failure() : "Future discarded");
http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5fa1fa7..e8176d2 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2348,8 +2348,8 @@ void _unwatch(
{
if (!unwatch.isReady()) {
LOG(ERROR) << "Failed to unwatch executor " << executorId
- << " of framework " << frameworkId
- << ": " << unwatch.isFailed() ? unwatch.failure() : "discarded";
+ << " of framework " << frameworkId << ": "
+ << (unwatch.isFailed() ? unwatch.failure() : "discarded");
}
}
@@ -2619,6 +2619,15 @@ Future<Nothing> Slave::_recover(const SlaveState& state, bool reconnect)
{
foreachvalue(Framework* framework, frameworks){
foreachvalue(Executor* executor, framework->executors) {
+ // If the executor is already terminating/terminated don't
+ // bother reconnecting or killing it. This could happen if
+ // the recovered isolator sent a 'ExecutorTerminated' message
+ // before the slave is here.
+ if (executor->state == Executor::TERMINATING ||
+ executor->state == Executor::TERMINATED) {
+ continue;
+ }
+
// Monitor the executor.
monitor.watch(
framework->id,
http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index ffd4736..6d4598e 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -297,8 +297,7 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
const TaskID& taskId = update.status().task_id();
const FrameworkID& frameworkId = update.framework_id();
- LOG(INFO) << "Received status update " << update
- << " with checkpoint=" << stringify(checkpoint);
+ LOG(INFO) << "Received status update " << update;
// Write the status update to disk and enqueue it to send it to the master.
// Create/Get the status update stream for this task.
@@ -457,8 +456,8 @@ StatusUpdateStream* StatusUpdateManagerProcess::createStatusUpdateStream(
const Option<ExecutorID>& executorId,
const Option<UUID>& uuid)
{
- LOG(INFO) << "Creating StatusUpdate stream for task " << taskId
- << " of framework " << frameworkId;
+ VLOG(1) << "Creating StatusUpdate stream for task " << taskId
+ << " of framework " << frameworkId;
StatusUpdateStream* stream = new StatusUpdateStream(
taskId, frameworkId, slaveId, flags, checkpoint, executorId, uuid);
@@ -488,9 +487,9 @@ void StatusUpdateManagerProcess::cleanupStatusUpdateStream(
const TaskID& taskId,
const FrameworkID& frameworkId)
{
- LOG(INFO) << "Cleaning up status update stream"
- << " for task " << taskId
- << " of framework " << frameworkId;
+ VLOG(1) << "Cleaning up status update stream"
+ << " for task " << taskId
+ << " of framework " << frameworkId;
CHECK(streams.contains(frameworkId))
<< "Cannot find the status update streams for framework " << frameworkId;
http://git-wip-us.apache.org/repos/asf/mesos/blob/649295f3/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index da92760..ffc79ae 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -285,7 +285,7 @@ struct StatusUpdateStream
return Error(error.get());
}
- LOG(INFO) << "Replaying status update stream for task " << taskId;
+ VLOG(1) << "Replaying status update stream for task " << taskId;
foreach (const StatusUpdate& update, updates) {
// Handle the update.
[09/18] git commit: Fixed master to properly handle TASK_LOST updates
generated by it.
Posted by vi...@apache.org.
Fixed master to properly handle TASK_LOST updates generated by it.
Review: https://reviews.apache.org/r/13446
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd3584a7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd3584a7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd3584a7
Branch: refs/heads/master
Commit: fd3584a71f68015fcfff70cc889b32fac28f941b
Parents: ba2ee7c
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Aug 9 11:08:15 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue Aug 13 14:35:59 2013 -0700
----------------------------------------------------------------------
src/master/master.cpp | 121 ++++++++++++++-----------------
src/messages/messages.proto | 2 +
src/tests/fault_tolerance_tests.cpp | 22 +++---
3 files changed, 68 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0675b52..6530008 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -875,6 +875,9 @@ void Master::launchTasks(const FrameworkID& frameworkId,
status->set_message("Task launched with invalid offer");
update->set_timestamp(Clock::now().secs());
update->set_uuid(UUID::random().toBytes());
+
+ LOG(INFO) << "Sending status update " << *update
+ << " for launch task attempt on invalid offer " << offerId;
send(framework->pid, message);
}
}
@@ -1163,10 +1166,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// NOTE: We cannot use 'from' here to identify the slave as this is
// now sent by the StatusUpdateManagerProcess. Only 'pid' can
// be used to identify the slave.
- LOG(INFO) << "Status update from " << pid
- << ": task " << status.task_id()
- << " of framework " << update.framework_id()
- << " is now in state " << status.state();
+ LOG(INFO) << "Status update " << update << " from " << pid;
Slave* slave = getSlave(update.slave_id());
if (slave == NULL) {
@@ -1174,12 +1174,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// If the slave is deactivated, we have already informed
// frameworks that its tasks were LOST, so the slave should
// shut down.
- LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from deactivated slave " << pid
<< " with id " << update.slave_id() << " ; asking slave "
<< " to shutdown";
send(pid, ShutdownMessage());
} else {
- LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from unknown slave " << pid
<< " with id " << update.slave_id();
}
stats.invalidStatusUpdates++;
@@ -1190,7 +1192,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
Framework* framework = getFramework(update.framework_id());
if (framework == NULL) {
- LOG(WARNING) << "Ignoring status update from " << pid << " ("
+ LOG(WARNING) << "Ignoring status update " << update
+ << " from " << pid << " ("
<< slave->info.hostname() << "): error, couldn't lookup "
<< "framework " << update.framework_id();
stats.invalidStatusUpdates++;
@@ -1206,7 +1209,8 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
// Lookup the task and see if we need to update anything locally.
Task* task = slave->getTask(update.framework_id(), status.task_id());
if (task == NULL) {
- LOG(WARNING) << "Status update from " << pid << " ("
+ LOG(WARNING) << "Status update " << update
+ << " from " << pid << " ("
<< slave->info.hostname() << "): error, couldn't lookup "
<< "task " << status.task_id();
stats.invalidStatusUpdates++;
@@ -1225,10 +1229,11 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
}
-void Master::exitedExecutor(const SlaveID& slaveId,
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- int32_t status)
+void Master::exitedExecutor(
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int32_t status)
{
// Only update master's internal data structures here for properly accounting.
// The TASK_LOST updates are handled by the slave.
@@ -1671,6 +1676,7 @@ void Master::processTasks(Offer* offer,
TASK_LOST,
error.get());
+ LOG(INFO) << "Sending status update " << update << " for invalid task";
StatusUpdateMessage message;
message.mutable_update()->CopyFrom(update);
send(framework->pid, message);
@@ -1786,20 +1792,14 @@ void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
<< " of framework " << task->framework_id()
<< " unknown to the slave " << slave->id;
- Framework* framework = getFramework(task->framework_id());
- if (framework != NULL) {
- const StatusUpdate& update = protobuf::createStatusUpdate(
- task->framework_id(),
- slave->id,
- task->task_id(),
- TASK_LOST,
- "Task was not received by the slave");
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ slave->id,
+ task->task_id(),
+ TASK_LOST,
+ "Task is unknown to the slave");
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(update);
- send(framework->pid, message);
- }
- removeTask(task);
+ statusUpdate(update, UPID());
}
}
@@ -1893,6 +1893,10 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
void Master::removeFramework(Framework* framework)
{
+ CHECK_NOTNULL(framework);
+
+ LOG(INFO) << "Removing framework " << framework->id;
+
if (framework->active) {
// Tell the allocator to stop allocating resources to this framework.
allocator->frameworkDeactivated(framework->id);
@@ -1961,6 +1965,10 @@ void Master::removeFramework(Slave* slave, Framework* framework)
CHECK_NOTNULL(slave);
CHECK_NOTNULL(framework);
+ LOG(INFO) << "Removing framework " << framework->id
+ << " from slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
+
// Remove pointers to framework's tasks in slaves, and send status updates.
foreachvalue (Task* task, utils::copy(slave->tasks)) {
// Remove tasks that belong to this framework.
@@ -1968,25 +1976,16 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected yet. For more info
// please see the comments in 'removeFramework(Framework*)'.
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(
- protobuf::createStatusUpdate(
- task->framework_id(),
- task->slave_id(),
- task->task_id(),
- TASK_LOST,
- "Slave " + slave->info.hostname() + " disconnected",
- (task->has_executor_id() ?
- Option<ExecutorID>(task->executor_id()) : None())));
-
- LOG(INFO) << "Sending status update " << message.update()
- << " due to disconnected slave " << slave->id
- << " (" << slave->info.hostname() << ")";
-
- send(framework->pid, message);
-
- // Remove the task from slave and framework.
- removeTask(task);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ task->slave_id(),
+ task->task_id(),
+ TASK_LOST,
+ "Slave " + slave->info.hostname() + " disconnected",
+ (task->has_executor_id()
+ ? Option<ExecutorID>(task->executor_id()) : None()));
+
+ statusUpdate(update, UPID());
}
}
@@ -2122,6 +2121,9 @@ void Master::removeSlave(Slave* slave)
{
CHECK_NOTNULL(slave);
+ LOG(INFO) << "Removing slave " << slave->id
+ << " (" << slave->info.hostname() << ")";
+
// We do this first, to make sure any of the resources recovered
// below (e.g., removeTask()) are ignored by the allocator.
if (!slave->disconnected) {
@@ -2130,8 +2132,6 @@ void Master::removeSlave(Slave* slave)
// Remove pointers to slave's tasks in frameworks, and send status updates
foreachvalue (Task* task, utils::copy(slave->tasks)) {
- Framework* framework = getFramework(task->framework_id());
-
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected. This can be a tricky
// situation for frameworks that want to have high-availability,
@@ -2140,25 +2140,16 @@ void Master::removeSlave(Slave* slave)
// want to do is create a local Framework object to represent that
// framework until it fails over. See the TODO above in
// Master::reregisterSlave.
- if (framework != NULL) {
- StatusUpdateMessage message;
- message.mutable_update()->CopyFrom(
- protobuf::createStatusUpdate(
- task->framework_id(),
- task->slave_id(),
- task->task_id(),
- TASK_LOST,
- "Slave " + slave->info.hostname() + " removed",
- (task->has_executor_id() ?
- Option<ExecutorID>(task->executor_id()) : None())));
-
- LOG(INFO) << "Sending status update " << message.update()
- << " due to the removal of slave "
- << slave->id << " (" << slave->info.hostname() << ")";
-
- send(framework->pid, message);
- }
- removeTask(task);
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ task->slave_id(),
+ task->task_id(),
+ TASK_LOST,
+ "Slave " + slave->info.hostname() + " removed",
+ (task->has_executor_id() ?
+ Option<ExecutorID>(task->executor_id()) : None()));
+
+ statusUpdate(update, UPID());
}
foreach (Offer* offer, utils::copy(slave->offers)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 19d4b38..4d400c2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -185,6 +185,8 @@ message KillTaskMessage {
}
+// NOTE: If 'pid' is present, scheduler driver sends an
+// acknowledgement to the pid.
message StatusUpdateMessage {
required StatusUpdate update = 1;
optional string pid = 2;
http://git-wip-us.apache.org/repos/asf/mesos/blob/fd3584a7/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 7bfe3b1..d74463f 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -431,18 +431,16 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
// At this point, the slave still thinks it's registered, so we
// simulate a status update coming from the slave.
- StatusUpdateMessage statusUpdate;
- statusUpdate.set_pid(stringify(slave.get()));
- statusUpdate.mutable_update()->mutable_framework_id()->set_value(
- frameworkId.get().value());
- statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
- statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
- statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
- "task_id");
- statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
- statusUpdate.mutable_update()->set_timestamp(Clock::now().secs());
- statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
- process::post(master.get(), statusUpdate);
+ TaskID taskId;
+ taskId.set_value("task_id");
+ const StatusUpdate& update = createStatusUpdate(
+ frameworkId.get(), slaveId, taskId, TASK_RUNNING);
+
+ StatusUpdateMessage message;
+ message.mutable_update()->CopyFrom(update);
+ message.set_pid(stringify(slave.get()));
+
+ process::post(master.get(), message);
// The master should shutdown the slave upon receiving the update.
AWAIT_READY(shutdownMessage);