You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/24 22:42:18 UTC
git commit: Moved framework id validation from scheduler driver to
master.
Repository: mesos
Updated Branches:
refs/heads/master 0e41ba0cf -> 122fc2e1f
Moved framework id validation from scheduler driver to master.
Review: https://reviews.apache.org/r/27102
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/122fc2e1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/122fc2e1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/122fc2e1
Branch: refs/heads/master
Commit: 122fc2e1f0ccadf9e81f49e45333e8815a059fa6
Parents: 0e41ba0
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Oct 23 11:33:27 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 24 13:40:22 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 8 ++-
src/sched/sched.cpp | 63 +++++-----------------
src/tests/resource_offers_tests.cpp | 91 ++++++++++++++++++++++++++++++++
3 files changed, 110 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 95589b8..9ebdc35 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1884,7 +1884,7 @@ struct SlaveIDChecker : TaskInfoVisitor
{
if (!(task.slave_id() == slave.id)) {
return "Task uses invalid slave " + task.slave_id().value() +
- " while slave " + slave.id.value() + " is expected";
+ " while slave " + slave.id.value() + " is expected";
}
return None();
@@ -2022,6 +2022,12 @@ struct ExecutorInfoChecker : TaskInfoVisitor
"Task has invalid ExecutorInfo: missing field 'framework_id'");
}
+ if (!(task.executor().framework_id() == framework.id)) {
+ return string("ExecutorInfo has an invalid FrameworkID") +
+ " (Actual: " + stringify(task.executor().framework_id()) +
+ " vs Expected: " + stringify(framework.id) + ")";
+ }
+
const ExecutorID& executorId = task.executor().executor_id();
Option<ExecutorInfo> executorInfo = None();
http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index e89e5e5..0fb8c7b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -822,12 +822,14 @@ protected:
VLOG(1) << "Ignoring launch tasks message as master is disconnected";
// NOTE: Reply to the framework with TASK_LOST messages for each
// task. This is a hack for now, to not let the scheduler
- // believe the tasks are forever in PENDING state, when actually
- // the master never received the launchTask message. Also,
- // realize that this hack doesn't capture the case when the
- // scheduler process sends it but the master never receives it
- // (message lost, master failover etc). In the future, this
- // should be solved by the replicated log and timeouts.
+ // believe the tasks are launched, when actually the master
+ // never received the launchTasks message. Also, realize that
+ // this hack doesn't capture the case when the scheduler process
+ // sends it but the master never receives it (message lost,
+ // master failover etc). The correct way for schedulers to deal
+ // with this situation is to use 'reconcileTasks()'.
+ // TODO(vinod): Kill this optimization in 0.22.0, to give
+ // frameworks time to implement reconciliation.
foreach (const TaskInfo& task, tasks) {
StatusUpdate update;
update.mutable_framework_id()->MergeFrom(framework.id());
@@ -843,55 +845,14 @@ protected:
return;
}
+ // Set TaskInfo.executor.framework_id, if it's missing.
vector<TaskInfo> result;
-
- foreach (const TaskInfo& task, tasks) {
- // Check that each TaskInfo has either an ExecutorInfo or a
- // CommandInfo but not both.
- if (task.has_executor() == task.has_command()) {
- StatusUpdate update;
- update.mutable_framework_id()->MergeFrom(framework.id());
- TaskStatus* status = update.mutable_status();
- status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_LOST);
- status->set_message(
- "TaskInfo must have either an 'executor' or a 'command'");
- update.set_timestamp(Clock::now().secs());
- update.set_uuid(UUID::random().toBytes());
-
- statusUpdate(UPID(), update, UPID());
- continue;
- }
-
- // Ensure the ExecutorInfo.framework_id is valid, if present.
- if (task.has_executor() &&
- task.executor().has_framework_id() &&
- !(task.executor().framework_id() == framework.id())) {
- StatusUpdate update;
- update.mutable_framework_id()->MergeFrom(framework.id());
- TaskStatus* status = update.mutable_status();
- status->mutable_task_id()->MergeFrom(task.task_id());
- status->set_state(TASK_LOST);
- status->set_message(
- "ExecutorInfo has an invalid FrameworkID (Actual: " +
- stringify(task.executor().framework_id()) + " vs Expected: " +
- stringify(framework.id()) + ")");
- update.set_timestamp(Clock::now().secs());
- update.set_uuid(UUID::random().toBytes());
-
- statusUpdate(UPID(), update, UPID());
- continue;
- }
-
- TaskInfo copy = task;
-
- // Set the ExecutorInfo.framework_id if missing.
+ foreach (TaskInfo task, tasks) {
if (task.has_executor() && !task.executor().has_framework_id()) {
- copy.mutable_executor()->mutable_framework_id()->CopyFrom(
+ task.mutable_executor()->mutable_framework_id()->CopyFrom(
framework.id());
}
-
- result.push_back(copy);
+ result.push_back(task);
}
LaunchTasksMessage message;
http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 060039e..fe66432 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -24,6 +24,7 @@
#include <mesos/scheduler.hpp>
#include <stout/strings.hpp>
+#include <stout/uuid.hpp>
#include "master/hierarchical_allocator_process.hpp"
#include "master/master.hpp"
@@ -99,6 +100,96 @@ TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
}
+TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // Create an executor with a random framework id.
+ ExecutorInfo executor;
+ executor = DEFAULT_EXECUTOR_INFO;
+ executor.mutable_framework_id()->set_value(UUID::random().toString());
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(executor, 1, 1, 16, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ driver.start();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(strings::startsWith(
+ status.get().message(), "ExecutorInfo has an invalid FrameworkID"));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Create a task that uses both command info and task info.
+ TaskInfo task = createTask(offers.get()[0], ""); // Command task.
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); // Executor task.
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_LOST, status.get().state());
+ EXPECT_TRUE(strings::contains(
+ status.get().message(), "CommandInfo or ExecutorInfo present"));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
TEST_F(ResourceOffersTest, TaskUsesNoResources)
{
Try<PID<Master> > master = StartMaster();