You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/24 22:42:18 UTC

git commit: Moved framework id validation from scheduler driver to master.

Repository: mesos
Updated Branches:
  refs/heads/master 0e41ba0cf -> 122fc2e1f


Moved framework id validation from scheduler driver to master.

Review: https://reviews.apache.org/r/27102


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/122fc2e1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/122fc2e1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/122fc2e1

Branch: refs/heads/master
Commit: 122fc2e1f0ccadf9e81f49e45333e8815a059fa6
Parents: 0e41ba0
Author: Vinod Kone <vi...@gmail.com>
Authored: Thu Oct 23 11:33:27 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Fri Oct 24 13:40:22 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp               |  8 ++-
 src/sched/sched.cpp                 | 63 +++++-----------------
 src/tests/resource_offers_tests.cpp | 91 ++++++++++++++++++++++++++++++++
 3 files changed, 110 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 95589b8..9ebdc35 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1884,7 +1884,7 @@ struct SlaveIDChecker : TaskInfoVisitor
   {
     if (!(task.slave_id() == slave.id)) {
       return "Task uses invalid slave " + task.slave_id().value() +
-          " while slave " + slave.id.value() + " is expected";
+             " while slave " + slave.id.value() + " is expected";
     }
 
     return None();
@@ -2022,6 +2022,12 @@ struct ExecutorInfoChecker : TaskInfoVisitor
             "Task has invalid ExecutorInfo: missing field 'framework_id'");
       }
 
+      if (!(task.executor().framework_id() == framework.id)) {
+        return string("ExecutorInfo has an invalid FrameworkID") +
+               " (Actual: " + stringify(task.executor().framework_id()) +
+               " vs Expected: " + stringify(framework.id) + ")";
+      }
+
       const ExecutorID& executorId = task.executor().executor_id();
       Option<ExecutorInfo> executorInfo = None();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index e89e5e5..0fb8c7b 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -822,12 +822,14 @@ protected:
       VLOG(1) << "Ignoring launch tasks message as master is disconnected";
       // NOTE: Reply to the framework with TASK_LOST messages for each
       // task. This is a hack for now, to not let the scheduler
-      // believe the tasks are forever in PENDING state, when actually
-      // the master never received the launchTask message. Also,
-      // realize that this hack doesn't capture the case when the
-      // scheduler process sends it but the master never receives it
-      // (message lost, master failover etc).  In the future, this
-      // should be solved by the replicated log and timeouts.
+      // believe the tasks are launched, when actually the master
+      // never received the launchTasks message. Also, realize that
+      // this hack doesn't capture the case when the scheduler process
+      // sends it but the master never receives it (message lost,
+      // master failover etc). The correct way for schedulers to deal
+      // with this situation is to use 'reconcileTasks()'.
+      // TODO(vinod): Kill this optimization in 0.22.0, to give
+      // frameworks time to implement reconciliation.
       foreach (const TaskInfo& task, tasks) {
         StatusUpdate update;
         update.mutable_framework_id()->MergeFrom(framework.id());
@@ -843,55 +845,14 @@ protected:
       return;
     }
 
+    // Set TaskInfo.executor.framework_id, if it's missing.
     vector<TaskInfo> result;
-
-    foreach (const TaskInfo& task, tasks) {
-      // Check that each TaskInfo has either an ExecutorInfo or a
-      // CommandInfo but not both.
-      if (task.has_executor() == task.has_command()) {
-        StatusUpdate update;
-        update.mutable_framework_id()->MergeFrom(framework.id());
-        TaskStatus* status = update.mutable_status();
-        status->mutable_task_id()->MergeFrom(task.task_id());
-        status->set_state(TASK_LOST);
-        status->set_message(
-            "TaskInfo must have either an 'executor' or a 'command'");
-        update.set_timestamp(Clock::now().secs());
-        update.set_uuid(UUID::random().toBytes());
-
-        statusUpdate(UPID(), update, UPID());
-        continue;
-      }
-
-      // Ensure the ExecutorInfo.framework_id is valid, if present.
-      if (task.has_executor() &&
-          task.executor().has_framework_id() &&
-          !(task.executor().framework_id() == framework.id())) {
-        StatusUpdate update;
-        update.mutable_framework_id()->MergeFrom(framework.id());
-        TaskStatus* status = update.mutable_status();
-        status->mutable_task_id()->MergeFrom(task.task_id());
-        status->set_state(TASK_LOST);
-        status->set_message(
-            "ExecutorInfo has an invalid FrameworkID (Actual: " +
-            stringify(task.executor().framework_id()) + " vs Expected: " +
-            stringify(framework.id()) + ")");
-        update.set_timestamp(Clock::now().secs());
-        update.set_uuid(UUID::random().toBytes());
-
-        statusUpdate(UPID(), update, UPID());
-        continue;
-      }
-
-      TaskInfo copy = task;
-
-      // Set the ExecutorInfo.framework_id if missing.
+    foreach (TaskInfo task, tasks) {
       if (task.has_executor() && !task.executor().has_framework_id()) {
-        copy.mutable_executor()->mutable_framework_id()->CopyFrom(
+        task.mutable_executor()->mutable_framework_id()->CopyFrom(
             framework.id());
       }
-
-      result.push_back(copy);
+      result.push_back(task);
     }
 
     LaunchTasksMessage message;

http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 060039e..fe66432 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -24,6 +24,7 @@
 #include <mesos/scheduler.hpp>
 
 #include <stout/strings.hpp>
+#include <stout/uuid.hpp>
 
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
@@ -99,6 +100,96 @@ TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves)
 }
 
 
+TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Create an executor with a random framework id.
+  ExecutorInfo executor;
+  executor = DEFAULT_EXECUTOR_INFO;
+  executor.mutable_framework_id()->set_value(UUID::random().toString());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(executor, 1, 1, 16, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(strings::startsWith(
+      status.get().message(), "ExecutorInfo has an invalid FrameworkID"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave> > slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Create a task that uses both command info and task info.
+  TaskInfo task = createTask(offers.get()[0], ""); // Command task.
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); // Executor task.
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+  EXPECT_TRUE(strings::contains(
+      status.get().message(), "CommandInfo or ExecutorInfo present"));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
 TEST_F(ResourceOffersTest, TaskUsesNoResources)
 {
   Try<PID<Master> > master = StartMaster();