You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/25 01:36:45 UTC
[1/3] mesos git commit: Added tests for unset framework pid in the
slave.
Repository: mesos
Updated Branches:
refs/heads/master a9312c237 -> 50696fa2f
Added tests for unset framework pid in the slave.
Since we do not yet have HTTP schedulers, this adds tests
that spoof empty pids coming from the master.
Review: https://reviews.apache.org/r/36761
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50696fa2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50696fa2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50696fa2
Branch: refs/heads/master
Commit: 50696fa2fa04bde042d992c8f9f2bd6020b79f4e
Parents: 9172a5f
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Jul 23 18:07:31 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700
----------------------------------------------------------------------
src/tests/mesos.hpp | 8 +-
src/tests/slave_tests.cpp | 275 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 281 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 1759d7e..7538c96 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1303,7 +1303,6 @@ const ::testing::Matcher<const std::vector<Offer>& > OfferEq(int cpus, int mem)
}
-// Definition of the SendStatusUpdateFromTask action to be used with gmock.
ACTION_P(SendStatusUpdateFromTask, state)
{
TaskStatus status;
@@ -1313,7 +1312,6 @@ ACTION_P(SendStatusUpdateFromTask, state)
}
-// Definition of the SendStatusUpdateFromTaskID action to be used with gmock.
ACTION_P(SendStatusUpdateFromTaskID, state)
{
TaskStatus status;
@@ -1323,6 +1321,12 @@ ACTION_P(SendStatusUpdateFromTaskID, state)
}
+ACTION_P(SendFrameworkMessage, data)
+{
+ arg0->sendFrameworkMessage(data);
+}
+
+
#define FUTURE_PROTOBUF(message, from, to) \
FutureProtobuf(message, from, to)
http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 64cef6e..e086817 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2386,6 +2386,281 @@ TEST_F(SlaveTest, CheckpointedResourcesIncludedInUsage)
Shutdown();
}
+
+// Ensures that the slave correctly handles a framework without
+// a pid, which will be the case for HTTP schedulers. In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPScheduler)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ Try<PID<Slave>> slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ // Capture the run task message to unset the framework pid.
+ Future<RunTaskMessage> runTaskMessage =
+ DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+ driver.start();
+
+ AWAIT_READY(runTaskMessage);
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendFrameworkMessage("message"));
+
+ // The slave should forward the message through the master.
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+ // The master should then forward the message to the framework.
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+ Future<Nothing> frameworkMessage;
+ EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+ .WillOnce(FutureSatisfy(&frameworkMessage));
+
+ // Clear the pid in the run task message so that the slave
+ // thinks this is an HTTP scheduler.
+ RunTaskMessage spoofed = runTaskMessage.get();
+ spoofed.set_pid("");
+
+ process::post(master.get(), slave.get(), spoofed);
+
+ AWAIT_READY(executorToFrameworkMessage1);
+ AWAIT_READY(executorToFrameworkMessage2);
+
+ AWAIT_READY(frameworkMessage);
+
+ // Must call shutdown before the mock executor gets deallocated.
+ Shutdown();
+}
+
+
+// Ensures that the slave correctly handles a framework upgrading
+// to HTTP (going from having a pid, to not having a pid). In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ Try<PID<Slave>> slave = StartSlave(&exec);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ Future<Nothing> launchTask;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(FutureSatisfy(&launchTask));
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(launchTask);
+
+ // Now spoof a live upgrade of the framework by updating
+ // the framework information to have an empty pid.
+ UpdateFrameworkMessage updateFrameworkMessage;
+ updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get());
+ updateFrameworkMessage.set_pid("");
+
+ process::post(master.get(), slave.get(), updateFrameworkMessage);
+
+ // Send a message from the executor; the slave should forward
+ // the message through the master.
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+ Future<Nothing> frameworkMessage;
+ EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+ .WillOnce(FutureSatisfy(&frameworkMessage));
+
+ execDriver->sendFrameworkMessage("message");
+
+ AWAIT_READY(executorToFrameworkMessage1);
+ AWAIT_READY(executorToFrameworkMessage2);
+
+ AWAIT_READY(frameworkMessage);
+
+ // Must call shutdown before the mock executor gets deallocated.
+ Shutdown();
+}
+
+
+// Ensures that the slave can restart when there is an empty
+// framework pid. Executor messages should go through the
+// master (instead of directly to the scheduler!).
+TEST_F(SlaveTest, HTTPSchedulerSlaveRestart)
+{
+ Try<PID<Master> > master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = this->CreateSlaveFlags();
+
+ Fetcher fetcher;
+
+ Try<slave::MesosContainerizer*> containerizer =
+ slave::MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(containerizer);
+
+ Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ // Capture the executor information.
+ Future<Message> registerExecutorMessage =
+ FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ SlaveID slaveId = offers.get()[0].slave_id();
+
+ // Capture the run task so that we can unset the framework pid.
+ Future<RunTaskMessage> runTaskMessage =
+ DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(runTaskMessage);
+
+ // Clear the pid in the run task message so that the slave
+ // thinks this is an HTTP scheduler.
+ RunTaskMessage spoofedRunTaskMessage = runTaskMessage.get();
+ spoofedRunTaskMessage.set_pid("");
+
+ process::post(master.get(), slave.get(), spoofedRunTaskMessage);
+
+ AWAIT_READY(registerExecutorMessage);
+
+ RegisterExecutorMessage registerExecutor;
+ registerExecutor.ParseFromString(registerExecutorMessage.get().body);
+ ExecutorID executorId = registerExecutor.executor_id();
+ UPID executorPid = registerExecutorMessage.get().from;
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Restart the slave.
+ Stop(slave.get());
+
+ Try<slave::MesosContainerizer*> containerizer2 =
+ slave::MesosContainerizer::create(flags, true, &fetcher);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Capture this so that we can unset the framework pid.
+ Future<UpdateFrameworkMessage> updateFrameworkMessage =
+ DROP_PROTOBUF(UpdateFrameworkMessage(), _, _);
+
+ slave = StartSlave(containerizer2.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveReregisteredMessage);
+ AWAIT_READY(updateFrameworkMessage);
+
+ // Make sure the slave sees an empty framework pid after recovery.
+ UpdateFrameworkMessage spoofedUpdateFrameworkMessage =
+ updateFrameworkMessage.get();
+ spoofedUpdateFrameworkMessage.set_pid("");
+
+ process::post(master.get(), slave.get(), spoofedUpdateFrameworkMessage);
+
+ // Spoof a message from the executor, to ensure the slave
+ // sends it through the master (instead of directly to the
+ // scheduler driver!).
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+ Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+ FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+ Future<Nothing> frameworkMessage;
+ EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+ .WillOnce(FutureSatisfy(&frameworkMessage));
+
+ ExecutorToFrameworkMessage executorToFrameworkMessage;
+ executorToFrameworkMessage.mutable_slave_id()->CopyFrom(slaveId);
+ executorToFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId);
+ executorToFrameworkMessage.mutable_executor_id()->CopyFrom(executorId);
+ executorToFrameworkMessage.set_data("message");
+
+ process::post(executorPid, slave.get(), executorToFrameworkMessage);
+
+ AWAIT_READY(executorToFrameworkMessage1);
+ AWAIT_READY(executorToFrameworkMessage2);
+ AWAIT_READY(frameworkMessage);
+
+ driver.stop();
+ driver.join();
+
+ this->Shutdown();
+
+ delete containerizer.get();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[3/3] mesos git commit: Added an ExecutorToFramework message handler
on the master.
Posted by bm...@apache.org.
Added an ExecutorToFramework message handler on the master.
This enables the slave to forward messages through the master.
Review: https://reviews.apache.org/r/36759
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac70a594
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac70a594
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac70a594
Branch: refs/heads/master
Commit: ac70a594e9b9700c4cd12eb95a26f8470785a7a9
Parents: a9312c2
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Jul 22 17:51:27 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 96 +++++++++++++++++++++++++++++++++++++++------
src/master/master.hpp | 7 ++++
src/master/metrics.cpp | 14 +++++++
src/master/metrics.hpp | 3 ++
src/slave/slave.cpp | 1 -
5 files changed, 109 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7796630..6d64bfc 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -713,6 +713,16 @@ void Master::initialize()
&StatusUpdateMessage::update,
&StatusUpdateMessage::pid);
+ // Added in 0.24.0 to support HTTP schedulers. Since
+ // these do not have a pid, the slave must forward
+ // messages through the master.
+ install<ExecutorToFrameworkMessage>(
+ &Master::executorMessage,
+ &ExecutorToFrameworkMessage::slave_id,
+ &ExecutorToFrameworkMessage::framework_id,
+ &ExecutorToFrameworkMessage::executor_id,
+ &ExecutorToFrameworkMessage::data);
+
install<ReconcileTasksMessage>(
&Master::reconcileTasks,
&ReconcileTasksMessage::framework_id,
@@ -3199,24 +3209,24 @@ void Master::schedulerMessage(
const ExecutorID& executorId,
const string& data)
{
- ++metrics->messages_framework_to_executor;
+ metrics->messages_framework_to_executor++;
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
- LOG(WARNING)
- << "Ignoring framework message for executor " << executorId
- << " of framework " << frameworkId
- << " because the framework cannot be found";
+ LOG(WARNING) << "Ignoring framework message"
+ << " for executor '" << executorId << "'"
+ << " of framework " << frameworkId
+ << " because the framework cannot be found";
metrics->invalid_framework_to_executor_messages++;
return;
}
if (from != framework->pid) {
- LOG(WARNING)
- << "Ignoring framework message for executor " << executorId
- << " of framework " << *framework
- << " because it is not expected from " << from;
+ LOG(WARNING) << "Ignoring framework message"
+ << " for executor '" << executorId << "'"
+ << " of framework " << *framework
+ << " because it is not expected from " << from;
metrics->invalid_framework_to_executor_messages++;
return;
}
@@ -3230,6 +3240,69 @@ void Master::schedulerMessage(
}
+void Master::executorMessage(
+ const UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
+{
+ metrics->messages_executor_to_framework++;
+
+ if (slaves.removed.get(slaveId).isSome()) {
+ // If the slave is removed, we have already informed
+ // frameworks that its tasks were LOST, so the slave
+ // should shut down.
+ LOG(WARNING) << "Ignoring executor message"
+ << " from executor" << " '" << executorId << "'"
+ << " of framework " << frameworkId
+ << " on removed slave " << slaveId
+ << " ; asking slave to shutdown";
+
+ ShutdownMessage message;
+ message.set_message("Executor message from unknown slave");
+ reply(message);
+ metrics->invalid_executor_to_framework_messages++;
+ return;
+ }
+
+ // The slave should (re-)register with the master before
+ // forwarding executor messages.
+ if (!slaves.registered.contains(slaveId)) {
+ LOG(WARNING) << "Ignoring executor message"
+ << " from executor '" << executorId << "'"
+ << " of framework " << frameworkId
+ << " on unknown slave " << slaveId;
+ metrics->invalid_executor_to_framework_messages++;
+ return;
+ }
+
+ Slave* slave = slaves.registered.get(slaveId);
+ CHECK_NOTNULL(slave);
+
+ Framework* framework = getFramework(frameworkId);
+
+ if (framework == NULL) {
+ LOG(WARNING) << "Not forwarding executor message"
+ << " for executor '" << executorId << "'"
+ << " of framework " << frameworkId
+ << " on slave " << *slave
+ << " because the framework is unknown";
+ metrics->invalid_executor_to_framework_messages++;
+ return;
+ }
+
+ ExecutorToFrameworkMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(framework->pid, message);
+
+ metrics->valid_executor_to_framework_messages++;
+}
+
+
void Master::message(
Framework* framework,
const scheduler::Call::Message& message)
@@ -3850,8 +3923,6 @@ void Master::exitedExecutor(
return;
}
- // Only update master's internal data structures here for proper
- // accounting. The TASK_LOST updates are handled by the slave.
if (!slaves.registered.contains(slaveId)) {
LOG(WARNING) << "Ignoring exited executor '" << executorId
<< "' of framework " << frameworkId
@@ -3859,6 +3930,9 @@ void Master::exitedExecutor(
return;
}
+ // Only update master's internal data structures here for proper
+ // accounting. The TASK_LOST updates are handled by the slave.
+
Slave* slave = slaves.registered.get(slaveId);
CHECK_NOTNULL(slave);
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 29113cb..827d0d5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -716,6 +716,13 @@ public:
const ExecutorID& executorId,
const std::string& data);
+ void executorMessage(
+ const process::UPID& from,
+ const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const std::string& data);
+
void registerSlave(
const process::UPID& from,
const SlaveInfo& slaveInfo,
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 10e2937..d79206f 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -118,6 +118,8 @@ Metrics::Metrics(const Master& master)
"master/messages_reconcile_tasks"),
messages_framework_to_executor(
"master/messages_framework_to_executor"),
+ messages_executor_to_framework(
+ "master/messages_executor_to_framework"),
messages_register_slave(
"master/messages_register_slave"),
messages_reregister_slave(
@@ -136,6 +138,10 @@ Metrics::Metrics(const Master& master)
"master/valid_framework_to_executor_messages"),
invalid_framework_to_executor_messages(
"master/invalid_framework_to_executor_messages"),
+ valid_executor_to_framework_messages(
+ "master/valid_executor_to_framework_messages"),
+ invalid_executor_to_framework_messages(
+ "master/invalid_executor_to_framework_messages"),
valid_status_updates(
"master/valid_status_updates"),
invalid_status_updates(
@@ -214,6 +220,7 @@ Metrics::Metrics(const Master& master)
process::metrics::add(messages_revive_offers);
process::metrics::add(messages_reconcile_tasks);
process::metrics::add(messages_framework_to_executor);
+ process::metrics::add(messages_executor_to_framework);
// Messages from slaves.
process::metrics::add(messages_register_slave);
@@ -229,6 +236,9 @@ Metrics::Metrics(const Master& master)
process::metrics::add(valid_framework_to_executor_messages);
process::metrics::add(invalid_framework_to_executor_messages);
+ process::metrics::add(valid_executor_to_framework_messages);
+ process::metrics::add(invalid_executor_to_framework_messages);
+
process::metrics::add(valid_status_updates);
process::metrics::add(invalid_status_updates);
@@ -345,6 +355,7 @@ Metrics::~Metrics()
process::metrics::remove(messages_revive_offers);
process::metrics::remove(messages_reconcile_tasks);
process::metrics::remove(messages_framework_to_executor);
+ process::metrics::remove(messages_executor_to_framework);
// Messages from slaves.
process::metrics::remove(messages_register_slave);
@@ -360,6 +371,9 @@ Metrics::~Metrics()
process::metrics::remove(valid_framework_to_executor_messages);
process::metrics::remove(invalid_framework_to_executor_messages);
+ process::metrics::remove(valid_executor_to_framework_messages);
+ process::metrics::remove(invalid_executor_to_framework_messages);
+
process::metrics::remove(valid_status_updates);
process::metrics::remove(invalid_status_updates);
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index d37d74a..5e96a5f 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -131,6 +131,7 @@ struct Metrics
process::metrics::Counter messages_revive_offers;
process::metrics::Counter messages_reconcile_tasks;
process::metrics::Counter messages_framework_to_executor;
+ process::metrics::Counter messages_executor_to_framework;
// Messages from slaves.
process::metrics::Counter messages_register_slave;
@@ -145,6 +146,8 @@ struct Metrics
process::metrics::Counter valid_framework_to_executor_messages;
process::metrics::Counter invalid_framework_to_executor_messages;
+ process::metrics::Counter valid_executor_to_framework_messages;
+ process::metrics::Counter invalid_executor_to_framework_messages;
process::metrics::Counter valid_status_updates;
process::metrics::Counter invalid_status_updates;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index dc12c45..784fdc8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2989,7 +2989,6 @@ void Slave::executorMessage(
return;
}
-
LOG(INFO) << "Sending message for framework " << frameworkId
<< " to " << framework->pid;
[2/3] mesos git commit: Handle scheduler pid as optional in the slave.
Posted by bm...@apache.org.
Handle scheduler pid as optional in the slave.
This is anticipation of HTTP scheduler support in 0.24.0.
Note that the 'pid' is set for driver-based schedulers. The
corresponding master changes to not set 'pid' for HTTP
schedulers have not occurred yet.
Review: https://reviews.apache.org/r/36760
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9172a5f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9172a5f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9172a5f5
Branch: refs/heads/master
Commit: 9172a5f50bc26c2bd88ff7382a0b5f0ccaf73b14
Parents: ac70a59
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Jul 23 15:17:22 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700
----------------------------------------------------------------------
src/master/master.cpp | 5 +-
src/messages/messages.proto | 13 +++++-
src/slave/slave.cpp | 99 ++++++++++++++++++++++++++++++----------
src/slave/slave.hpp | 16 +++++--
src/slave/state.hpp | 4 ++
src/tests/mesos.cpp | 7 ++-
src/tests/mesos.hpp | 8 ++--
src/tests/slave_tests.cpp | 5 +-
8 files changed, 111 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6d64bfc..613a011 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5042,8 +5042,9 @@ void Master::addSlave(
// TODO(vinod): Reconcile the notion of a completed framework across the
// master and slave.
foreach (const Archive::Framework& completedFramework, completedFrameworks) {
- const FrameworkID& frameworkId = completedFramework.framework_info().id();
- Framework* framework = getFramework(frameworkId);
+ Framework* framework = getFramework(
+ completedFramework.framework_info().id());
+
foreach (const Task& task, completedFramework.tasks()) {
if (framework != NULL) {
VLOG(2) << "Re-adding completed task " << task.task_id()
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 165a16d..8977d8e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -193,8 +193,15 @@ message RunTaskMessage {
// TODO(karya): Remove framework_id after MESOS-2559 has shipped.
optional FrameworkID framework_id = 1 [deprecated = true];
required FrameworkInfo framework = 2;
- required string pid = 3;
required TaskInfo task = 4;
+
+ // The pid of the framework. This was moved to 'optional' in
+ // 0.24.0 to support schedulers using the HTTP API. For now, we
+ // continue to always set pid since it was required in 0.23.x.
+ // When 'pid' is unset, or set to empty string, the slave will
+ // forward executor messages through the master. For schedulers
+ // still using the driver, this will remain set.
+ optional string pid = 3;
}
@@ -335,7 +342,9 @@ message ShutdownExecutorMessage {
message UpdateFrameworkMessage {
required FrameworkID framework_id = 1;
- required string pid = 2;
+
+ // See the comment on RunTaskMessage.pid.
+ optional string pid = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 784fdc8..4ba95f9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1165,13 +1165,16 @@ void Slave::doReliableRegistration(Duration maxBackoff)
foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
VLOG(1) << "Reregistering completed framework "
<< completedFramework->id();
+
Archive::Framework* completedFramework_ =
message.add_completed_frameworks();
- FrameworkInfo* frameworkInfo =
- completedFramework_->mutable_framework_info();
- frameworkInfo->CopyFrom(completedFramework->info);
- completedFramework_->set_pid(completedFramework->pid);
+ completedFramework_->mutable_framework_info()->CopyFrom(
+ completedFramework->info);
+
+ if (completedFramework->pid.isSome()) {
+ completedFramework_->set_pid(completedFramework->pid.get());
+ }
foreach (const Owned<Executor>& executor,
completedFramework->completedExecutors) {
@@ -1179,10 +1182,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
<< " with " << executor->terminatedTasks.size()
<< " terminated tasks, " << executor->completedTasks.size()
<< " completed tasks";
+
foreach (const Task* task, executor->terminatedTasks.values()) {
VLOG(2) << "Reregistering terminated task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
}
+
foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
VLOG(2) << "Reregistering completed task " << task->task_id();
completedFramework_->add_tasks()->CopyFrom(*task);
@@ -1222,7 +1227,7 @@ void Slave::runTask(
const UPID& from,
const FrameworkInfo& frameworkInfo_,
const FrameworkID& frameworkId_,
- const string& pid,
+ const UPID& pid,
TaskInfo task)
{
if (master != from) {
@@ -1291,7 +1296,13 @@ void Slave::runTask(
unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
}
- framework = new Framework(this, frameworkInfo, pid);
+ Option<UPID> frameworkPid = None();
+
+ if (pid != UPID()) {
+ frameworkPid = pid;
+ }
+
+ framework = new Framework(this, frameworkInfo, frameworkPid);
frameworks[frameworkId] = framework;
// Is this same framework in completedFrameworks? If so, move the completed
@@ -1340,14 +1351,13 @@ void Slave::runTask(
// Run the task after the unschedules are done.
unschedule.onAny(
- defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
+ defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
}
void Slave::_runTask(
const Future<bool>& future,
const FrameworkInfo& frameworkInfo,
- const string& pid,
const TaskInfo& task)
{
const FrameworkID frameworkId = frameworkInfo.id();
@@ -1733,8 +1743,12 @@ void Slave::runTasks(
RunTaskMessage message;
message.mutable_framework_id()->MergeFrom(framework->id());
message.mutable_framework()->MergeFrom(framework->info);
- message.set_pid(framework->pid);
message.mutable_task()->MergeFrom(task);
+
+ // Note that 0.23.x executors require the 'pid' to be set
+ // to decode the message, but do not use the field.
+ message.set_pid(framework->pid.getOrElse(UPID()));
+
send(executor->pid, message);
}
}
@@ -2087,7 +2101,9 @@ void Slave::schedulerMessage(
}
-void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
+void Slave::updateFramework(
+ const FrameworkID& frameworkId,
+ const UPID& pid)
{
CHECK(state == RECOVERING || state == DISCONNECTED ||
state == RUNNING || state == TERMINATING)
@@ -2115,15 +2131,25 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
case Framework::RUNNING: {
LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
- framework->pid = pid;
+ if (pid == UPID()) {
+ framework->pid = None();
+ } else {
+ framework->pid = pid;
+ }
+
if (framework->info.checkpoint()) {
- // Checkpoint the framework pid.
+ // Checkpoint the framework pid, note that when the 'pid'
+ // is None, we checkpoint a default UPID() because
+ // 0.23.x slaves consider a missing pid file to be an
+ // error.
const string path = paths::getFrameworkPidPath(
metaDir, info.id(), frameworkId);
- VLOG(1) << "Checkpointing framework pid '"
- << framework->pid << "' to '" << path << "'";
- CHECK_SOME(state::checkpoint(path, framework->pid));
+ VLOG(1) << "Checkpointing framework pid"
+ << " '" << framework->pid.getOrElse(UPID()) << "'"
+ << " to '" << path << "'";
+
+ CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
}
// Inform status update manager to immediately resend any pending
@@ -2989,15 +3015,23 @@ void Slave::executorMessage(
return;
}
- LOG(INFO) << "Sending message for framework " << frameworkId
- << " to " << framework->pid;
-
ExecutorToFrameworkMessage message;
message.mutable_slave_id()->MergeFrom(slaveId);
message.mutable_framework_id()->MergeFrom(frameworkId);
message.mutable_executor_id()->MergeFrom(executorId);
message.set_data(data);
- send(framework->pid, message);
+
+ CHECK_SOME(master);
+
+ if (framework->pid.isSome()) {
+ LOG(INFO) << "Sending message for framework " << frameworkId
+ << " to " << framework->pid.get();
+ send(framework->pid.get(), message);
+ } else {
+ LOG(INFO) << "Sending message for framework " << frameworkId
+ << " through the master " << master.get();
+ send(master.get(), message);
+ }
metrics.valid_framework_messages++;
}
@@ -4142,8 +4176,17 @@ void Slave::recoverFramework(const FrameworkState& state)
CHECK_EQ(frameworkInfo.id(), state.id);
}
+ // In 0.24.0, HTTP schedulers are supported and these do not
+ // have a 'pid'. In this case, the slave will checkpoint UPID().
CHECK_SOME(state.pid);
- Framework* framework = new Framework(this, frameworkInfo, state.pid.get());
+
+ Option<UPID> pid = state.pid.get();
+
+ if (pid.get() == UPID()) {
+ pid = None();
+ }
+
+ Framework* framework = new Framework(this, frameworkInfo, pid);
frameworks[framework->id()] = framework;
// Now recover the executors for this framework.
@@ -4662,7 +4705,7 @@ double Slave::_resources_revocable_percent(const string& name)
Framework::Framework(
Slave* _slave,
const FrameworkInfo& _info,
- const UPID& _pid)
+ const Option<UPID>& _pid)
: state(RUNNING),
slave(_slave),
info(_info),
@@ -4675,15 +4718,21 @@ Framework::Framework(
slave->metaDir, slave->info.id(), id());
VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'";
+
CHECK_SOME(state::checkpoint(path, info));
- // Checkpoint the framework pid.
+ // Checkpoint the framework pid, note that we checkpoint a
+ // UPID() when it is None (for HTTP schedulers) because
+ // 0.23.x slaves consider a missing pid file to be an
+ // error.
path = paths::getFrameworkPidPath(
slave->metaDir, slave->info.id(), id());
- VLOG(1) << "Checkpointing framework pid '"
- << pid << "' to '" << path << "'";
- CHECK_SOME(state::checkpoint(path, pid));
+ VLOG(1) << "Checkpointing framework pid"
+ << " '" << pid.getOrElse(UPID()) << "'"
+ << " to '" << path << "'";
+
+ CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID())));
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dec4ca8..41d0949 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -117,14 +117,13 @@ public:
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
- const std::string& pid,
+ const process::UPID& pid,
TaskInfo task);
// Made 'virtual' for Slave mocking.
virtual void _runTask(
const process::Future<bool>& future,
const FrameworkInfo& frameworkInfo,
- const std::string& pid,
const TaskInfo& task);
process::Future<bool> unschedule(const std::string& path);
@@ -150,7 +149,9 @@ public:
const ExecutorID& executorId,
const std::string& data);
- void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
+ void updateFramework(
+ const FrameworkID& frameworkId,
+ const process::UPID& pid);
void checkpointResources(const std::vector<Resource>& checkpointedResources);
@@ -634,7 +635,7 @@ struct Framework
Framework(
Slave* slave,
const FrameworkInfo& info,
- const process::UPID& pid);
+ const Option<process::UPID>& pid);
~Framework();
@@ -660,7 +661,12 @@ struct Framework
const FrameworkInfo info;
- UPID pid;
+ // Frameworks using the scheduler driver will have a 'pid',
+ // which allows us to send executor messages directly to the
+ // driver. Frameworks using the HTTP API (in 0.24.0) will
+ // not have a 'pid', in which case executor messages are
+ // sent through the master.
+ Option<UPID> pid;
// Executors with pending tasks.
hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending;
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 4e00468..cecf200 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -248,7 +248,11 @@ struct FrameworkState
FrameworkID id;
Option<FrameworkInfo> info;
+
+ // Note that HTTP frameworks (supported in 0.24.0) do not have a
+ // PID, in which case 'pid' is Some(UPID()) rather than None().
Option<process::UPID> pid;
+
hashmap<ExecutorID, ExecutorState> executors;
unsigned int errors;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index f09ef0f..f3b7315 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -462,7 +462,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
// Set up default behaviors, calling the original methods.
EXPECT_CALL(*this, runTask(_, _, _, _, _))
.WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
- EXPECT_CALL(*this, _runTask(_, _, _, _))
+ EXPECT_CALL(*this, _runTask(_, _, _))
.WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
EXPECT_CALL(*this, killTask(_, _, _))
.WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -485,7 +485,7 @@ void MockSlave::unmocked_runTask(
const UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
- const std::string& pid,
+ const UPID& pid,
TaskInfo task)
{
slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
@@ -495,10 +495,9 @@ void MockSlave::unmocked_runTask(
void MockSlave::unmocked__runTask(
const Future<bool>& future,
const FrameworkInfo& frameworkInfo,
- const std::string& pid,
const TaskInfo& task)
{
- slave::Slave::_runTask(future, frameworkInfo, pid, task);
+ slave::Slave::_runTask(future, frameworkInfo, task);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8a76b4f..1759d7e 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -811,26 +811,24 @@ public:
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
- const std::string& pid,
+ const process::UPID& pid,
TaskInfo task));
void unmocked_runTask(
const process::UPID& from,
const FrameworkInfo& frameworkInfo,
const FrameworkID& frameworkId,
- const std::string& pid,
+ const process::UPID& pid,
TaskInfo task);
- MOCK_METHOD4(_runTask, void(
+ MOCK_METHOD3(_runTask, void(
const process::Future<bool>& future,
const FrameworkInfo& frameworkInfo,
- const std::string& pid,
const TaskInfo& task));
void unmocked__runTask(
const process::Future<bool>& future,
const FrameworkInfo& frameworkInfo,
- const std::string& pid,
const TaskInfo& task);
MOCK_METHOD3(killTask, void(
http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b145d76..64cef6e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1793,7 +1793,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
// later, tie reaching the critical moment when to kill the task to
// a future.
Future<Nothing> _runTask;
- EXPECT_CALL(slave, _runTask(_, _, _, _))
+ EXPECT_CALL(slave, _runTask(_, _, _))
.WillOnce(DoAll(FutureSatisfy(&_runTask),
SaveArg<0>(&future),
SaveArg<1>(&frameworkInfo)));
@@ -1818,8 +1818,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
driver.killTask(task.task_id());
AWAIT_READY(killTask);
- slave.unmocked__runTask(
- future, frameworkInfo, master.get(), task);
+ slave.unmocked__runTask(future, frameworkInfo, task);
AWAIT_READY(removeFramework);