You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/25 01:36:45 UTC

[1/3] mesos git commit: Added tests for unset framework pid in the slave.

Repository: mesos
Updated Branches:
  refs/heads/master a9312c237 -> 50696fa2f


Added tests for unset framework pid in the slave.

Since we do not yet have HTTP schedulers, this adds tests
that spoof empty pids coming from the master.

Review: https://reviews.apache.org/r/36761


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50696fa2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50696fa2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50696fa2

Branch: refs/heads/master
Commit: 50696fa2fa04bde042d992c8f9f2bd6020b79f4e
Parents: 9172a5f
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Jul 23 18:07:31 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp       |   8 +-
 src/tests/slave_tests.cpp | 275 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 281 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 1759d7e..7538c96 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1303,7 +1303,6 @@ const ::testing::Matcher<const std::vector<Offer>& > OfferEq(int cpus, int mem)
 }
 
 
-// Definition of the SendStatusUpdateFromTask action to be used with gmock.
 ACTION_P(SendStatusUpdateFromTask, state)
 {
   TaskStatus status;
@@ -1313,7 +1312,6 @@ ACTION_P(SendStatusUpdateFromTask, state)
 }
 
 
-// Definition of the SendStatusUpdateFromTaskID action to be used with gmock.
 ACTION_P(SendStatusUpdateFromTaskID, state)
 {
   TaskStatus status;
@@ -1323,6 +1321,12 @@ ACTION_P(SendStatusUpdateFromTaskID, state)
 }
 
 
+ACTION_P(SendFrameworkMessage, data)
+{
+  arg0->sendFrameworkMessage(data);
+}
+
+
 #define FUTURE_PROTOBUF(message, from, to)              \
   FutureProtobuf(message, from, to)
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/50696fa2/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 64cef6e..e086817 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -2386,6 +2386,281 @@ TEST_F(SlaveTest, CheckpointedResourcesIncludedInUsage)
   Shutdown();
 }
 
+
+// Ensures that the slave correctly handles a framework without
+// a pid, which will be the case for HTTP schedulers. In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPScheduler)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Capture the run task message to unset the framework pid.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+  driver.start();
+
+  AWAIT_READY(runTaskMessage);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendFrameworkMessage("message"));
+
+  // The slave should forward the message through the master.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  // The master should then forward the message to the framework.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  // Clear the pid in the run task message so that the slave
+  // thinks this is an HTTP scheduler.
+  RunTaskMessage spoofed = runTaskMessage.get();
+  spoofed.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofed);
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+
+  AWAIT_READY(frameworkMessage);
+
+  // Must call shutdown before the mock executor gets deallocated.
+  Shutdown();
+}
+
+
+// Ensures that the slave correctly handles a framework upgrading
+// to HTTP (going from having a pid, to not having a pid). In
+// particular, executor messages should be routed through the
+// master.
+TEST_F(SlaveTest, HTTPSchedulerLiveUpgrade)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  Future<Nothing> launchTask;
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(FutureSatisfy(&launchTask));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(launchTask);
+
+  // Now spoof a live upgrade of the framework by updating
+  // the framework information to have an empty pid.
+  UpdateFrameworkMessage updateFrameworkMessage;
+  updateFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId.get());
+  updateFrameworkMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), updateFrameworkMessage);
+
+  // Send a message from the executor; the slave should forward
+  // the message through the master.
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  execDriver->sendFrameworkMessage("message");
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+
+  AWAIT_READY(frameworkMessage);
+
+  // Must call shutdown before the mock executor gets deallocated.
+  Shutdown();
+}
+
+
+// Ensures that the slave can restart when there is an empty
+// framework pid. Executor messages should go through the
+// master (instead of directly to the scheduler!).
+TEST_F(SlaveTest, HTTPSchedulerSlaveRestart)
+{
+  Try<PID<Master> > master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher;
+
+  Try<slave::MesosContainerizer*> containerizer =
+    slave::MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer);
+
+  Try<PID<Slave> > slave = this->StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  // Capture the executor information.
+  Future<Message> registerExecutorMessage =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  SlaveID slaveId = offers.get()[0].slave_id();
+
+  // Capture the run task so that we can unset the framework pid.
+  Future<RunTaskMessage> runTaskMessage =
+    DROP_PROTOBUF(RunTaskMessage(), master.get(), slave.get());
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(runTaskMessage);
+
+  // Clear the pid in the run task message so that the slave
+  // thinks this is an HTTP scheduler.
+  RunTaskMessage spoofedRunTaskMessage = runTaskMessage.get();
+  spoofedRunTaskMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofedRunTaskMessage);
+
+  AWAIT_READY(registerExecutorMessage);
+
+  RegisterExecutorMessage registerExecutor;
+  registerExecutor.ParseFromString(registerExecutorMessage.get().body);
+  ExecutorID executorId = registerExecutor.executor_id();
+  UPID executorPid = registerExecutorMessage.get().from;
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // Restart the slave.
+  Stop(slave.get());
+
+  Try<slave::MesosContainerizer*> containerizer2 =
+    slave::MesosContainerizer::create(flags, true, &fetcher);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+     FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Capture this so that we can unset the framework pid.
+  Future<UpdateFrameworkMessage> updateFrameworkMessage =
+     DROP_PROTOBUF(UpdateFrameworkMessage(), _, _);
+
+  slave = StartSlave(containerizer2.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveReregisteredMessage);
+  AWAIT_READY(updateFrameworkMessage);
+
+  // Make sure the slave sees an empty framework pid after recovery.
+  UpdateFrameworkMessage spoofedUpdateFrameworkMessage =
+    updateFrameworkMessage.get();
+  spoofedUpdateFrameworkMessage.set_pid("");
+
+  process::post(master.get(), slave.get(), spoofedUpdateFrameworkMessage);
+
+  // Spoof a message from the executor, to ensure the slave
+  // sends it through the master (instead of directly to the
+  // scheduler driver!).
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage1 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), slave.get(), master.get());
+
+  Future<ExecutorToFrameworkMessage> executorToFrameworkMessage2 =
+    FUTURE_PROTOBUF(ExecutorToFrameworkMessage(), master.get(), _);
+
+  Future<Nothing> frameworkMessage;
+  EXPECT_CALL(sched, frameworkMessage(&driver, _, _, "message"))
+    .WillOnce(FutureSatisfy(&frameworkMessage));
+
+  ExecutorToFrameworkMessage executorToFrameworkMessage;
+  executorToFrameworkMessage.mutable_slave_id()->CopyFrom(slaveId);
+  executorToFrameworkMessage.mutable_framework_id()->CopyFrom(frameworkId);
+  executorToFrameworkMessage.mutable_executor_id()->CopyFrom(executorId);
+  executorToFrameworkMessage.set_data("message");
+
+  process::post(executorPid, slave.get(), executorToFrameworkMessage);
+
+  AWAIT_READY(executorToFrameworkMessage1);
+  AWAIT_READY(executorToFrameworkMessage2);
+  AWAIT_READY(frameworkMessage);
+
+  driver.stop();
+  driver.join();
+
+  this->Shutdown();
+
+  delete containerizer.get();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[3/3] mesos git commit: Added an ExecutorToFramework message handler on the master.

Posted by bm...@apache.org.
Added an ExecutorToFramework message handler on the master.

This enables the slave to forward messages through the master.

Review: https://reviews.apache.org/r/36759


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac70a594
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac70a594
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac70a594

Branch: refs/heads/master
Commit: ac70a594e9b9700c4cd12eb95a26f8470785a7a9
Parents: a9312c2
Author: Benjamin Mahler <be...@gmail.com>
Authored: Wed Jul 22 17:51:27 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp  | 96 +++++++++++++++++++++++++++++++++++++++------
 src/master/master.hpp  |  7 ++++
 src/master/metrics.cpp | 14 +++++++
 src/master/metrics.hpp |  3 ++
 src/slave/slave.cpp    |  1 -
 5 files changed, 109 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7796630..6d64bfc 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -713,6 +713,16 @@ void Master::initialize()
       &StatusUpdateMessage::update,
       &StatusUpdateMessage::pid);
 
+  // Added in 0.24.0 to support HTTP schedulers. Since
+  // these do not have a pid, the slave must forward
+  // messages through the master.
+  install<ExecutorToFrameworkMessage>(
+      &Master::executorMessage,
+      &ExecutorToFrameworkMessage::slave_id,
+      &ExecutorToFrameworkMessage::framework_id,
+      &ExecutorToFrameworkMessage::executor_id,
+      &ExecutorToFrameworkMessage::data);
+
   install<ReconcileTasksMessage>(
       &Master::reconcileTasks,
       &ReconcileTasksMessage::framework_id,
@@ -3199,24 +3209,24 @@ void Master::schedulerMessage(
     const ExecutorID& executorId,
     const string& data)
 {
-  ++metrics->messages_framework_to_executor;
+  metrics->messages_framework_to_executor++;
 
   Framework* framework = getFramework(frameworkId);
 
   if (framework == NULL) {
-    LOG(WARNING)
-      << "Ignoring framework message for executor " << executorId
-      << " of framework " << frameworkId
-      << " because the framework cannot be found";
+    LOG(WARNING) << "Ignoring framework message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " because the framework cannot be found";
     metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
   if (from != framework->pid) {
-    LOG(WARNING)
-      << "Ignoring framework message for executor " << executorId
-      << " of framework " << *framework
-      << " because it is not expected from " << from;
+    LOG(WARNING) << "Ignoring framework message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << *framework
+                 << " because it is not expected from " << from;
     metrics->invalid_framework_to_executor_messages++;
     return;
   }
@@ -3230,6 +3240,69 @@ void Master::schedulerMessage(
 }
 
 
+void Master::executorMessage(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const string& data)
+{
+  metrics->messages_executor_to_framework++;
+
+  if (slaves.removed.get(slaveId).isSome()) {
+    // If the slave is removed, we have already informed
+    // frameworks that its tasks were LOST, so the slave
+    // should shut down.
+    LOG(WARNING) << "Ignoring executor message"
+                 << " from executor" << " '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on removed slave " << slaveId
+                 << " ; asking slave to shutdown";
+
+    ShutdownMessage message;
+    message.set_message("Executor message from unknown slave");
+    reply(message);
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  // The slave should (re-)register with the master before
+  // forwarding executor messages.
+  if (!slaves.registered.contains(slaveId)) {
+    LOG(WARNING) << "Ignoring executor message"
+                 << " from executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on unknown slave " << slaveId;
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveId);
+  CHECK_NOTNULL(slave);
+
+  Framework* framework = getFramework(frameworkId);
+
+  if (framework == NULL) {
+    LOG(WARNING) << "Not forwarding executor message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on slave " << *slave
+                 << " because the framework is unknown";
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  ExecutorToFrameworkMessage message;
+  message.mutable_slave_id()->MergeFrom(slaveId);
+  message.mutable_framework_id()->MergeFrom(frameworkId);
+  message.mutable_executor_id()->MergeFrom(executorId);
+  message.set_data(data);
+  send(framework->pid, message);
+
+  metrics->valid_executor_to_framework_messages++;
+}
+
+
 void Master::message(
     Framework* framework,
     const scheduler::Call::Message& message)
@@ -3850,8 +3923,6 @@ void Master::exitedExecutor(
     return;
   }
 
-  // Only update master's internal data structures here for proper
-  // accounting. The TASK_LOST updates are handled by the slave.
   if (!slaves.registered.contains(slaveId)) {
     LOG(WARNING) << "Ignoring exited executor '" << executorId
                  << "' of framework " << frameworkId
@@ -3859,6 +3930,9 @@ void Master::exitedExecutor(
     return;
   }
 
+  // Only update master's internal data structures here for proper
+  // accounting. The TASK_LOST updates are handled by the slave.
+
   Slave* slave = slaves.registered.get(slaveId);
   CHECK_NOTNULL(slave);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 29113cb..827d0d5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -716,6 +716,13 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
+  void executorMessage(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const std::string& data);
+
   void registerSlave(
       const process::UPID& from,
       const SlaveInfo& slaveInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 10e2937..d79206f 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -118,6 +118,8 @@ Metrics::Metrics(const Master& master)
         "master/messages_reconcile_tasks"),
     messages_framework_to_executor(
         "master/messages_framework_to_executor"),
+    messages_executor_to_framework(
+        "master/messages_executor_to_framework"),
     messages_register_slave(
         "master/messages_register_slave"),
     messages_reregister_slave(
@@ -136,6 +138,10 @@ Metrics::Metrics(const Master& master)
         "master/valid_framework_to_executor_messages"),
     invalid_framework_to_executor_messages(
         "master/invalid_framework_to_executor_messages"),
+    valid_executor_to_framework_messages(
+        "master/valid_executor_to_framework_messages"),
+    invalid_executor_to_framework_messages(
+        "master/invalid_executor_to_framework_messages"),
     valid_status_updates(
         "master/valid_status_updates"),
     invalid_status_updates(
@@ -214,6 +220,7 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(messages_revive_offers);
   process::metrics::add(messages_reconcile_tasks);
   process::metrics::add(messages_framework_to_executor);
+  process::metrics::add(messages_executor_to_framework);
 
   // Messages from slaves.
   process::metrics::add(messages_register_slave);
@@ -229,6 +236,9 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(valid_framework_to_executor_messages);
   process::metrics::add(invalid_framework_to_executor_messages);
 
+  process::metrics::add(valid_executor_to_framework_messages);
+  process::metrics::add(invalid_executor_to_framework_messages);
+
   process::metrics::add(valid_status_updates);
   process::metrics::add(invalid_status_updates);
 
@@ -345,6 +355,7 @@ Metrics::~Metrics()
   process::metrics::remove(messages_revive_offers);
   process::metrics::remove(messages_reconcile_tasks);
   process::metrics::remove(messages_framework_to_executor);
+  process::metrics::remove(messages_executor_to_framework);
 
   // Messages from slaves.
   process::metrics::remove(messages_register_slave);
@@ -360,6 +371,9 @@ Metrics::~Metrics()
   process::metrics::remove(valid_framework_to_executor_messages);
   process::metrics::remove(invalid_framework_to_executor_messages);
 
+  process::metrics::remove(valid_executor_to_framework_messages);
+  process::metrics::remove(invalid_executor_to_framework_messages);
+
   process::metrics::remove(valid_status_updates);
   process::metrics::remove(invalid_status_updates);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index d37d74a..5e96a5f 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -131,6 +131,7 @@ struct Metrics
   process::metrics::Counter messages_revive_offers;
   process::metrics::Counter messages_reconcile_tasks;
   process::metrics::Counter messages_framework_to_executor;
+  process::metrics::Counter messages_executor_to_framework;
 
   // Messages from slaves.
   process::metrics::Counter messages_register_slave;
@@ -145,6 +146,8 @@ struct Metrics
 
   process::metrics::Counter valid_framework_to_executor_messages;
   process::metrics::Counter invalid_framework_to_executor_messages;
+  process::metrics::Counter valid_executor_to_framework_messages;
+  process::metrics::Counter invalid_executor_to_framework_messages;
 
   process::metrics::Counter valid_status_updates;
   process::metrics::Counter invalid_status_updates;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index dc12c45..784fdc8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2989,7 +2989,6 @@ void Slave::executorMessage(
     return;
   }
 
-
   LOG(INFO) << "Sending message for framework " << frameworkId
             << " to " << framework->pid;
 


[2/3] mesos git commit: Handle scheduler pid as optional in the slave.

Posted by bm...@apache.org.
Handle scheduler pid as optional in the slave.

This is anticipation of HTTP scheduler support in 0.24.0.
Note that the 'pid' is set for driver-based schedulers. The
corresponding master changes to not set 'pid' for HTTP
schedulers have not occurred yet.

Review: https://reviews.apache.org/r/36760


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9172a5f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9172a5f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9172a5f5

Branch: refs/heads/master
Commit: 9172a5f50bc26c2bd88ff7382a0b5f0ccaf73b14
Parents: ac70a59
Author: Benjamin Mahler <be...@gmail.com>
Authored: Thu Jul 23 15:17:22 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp       |  5 +-
 src/messages/messages.proto | 13 +++++-
 src/slave/slave.cpp         | 99 ++++++++++++++++++++++++++++++----------
 src/slave/slave.hpp         | 16 +++++--
 src/slave/state.hpp         |  4 ++
 src/tests/mesos.cpp         |  7 ++-
 src/tests/mesos.hpp         |  8 ++--
 src/tests/slave_tests.cpp   |  5 +-
 8 files changed, 111 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6d64bfc..613a011 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5042,8 +5042,9 @@ void Master::addSlave(
   // TODO(vinod): Reconcile the notion of a completed framework across the
   // master and slave.
   foreach (const Archive::Framework& completedFramework, completedFrameworks) {
-    const FrameworkID& frameworkId = completedFramework.framework_info().id();
-    Framework* framework = getFramework(frameworkId);
+    Framework* framework = getFramework(
+        completedFramework.framework_info().id());
+
     foreach (const Task& task, completedFramework.tasks()) {
       if (framework != NULL) {
         VLOG(2) << "Re-adding completed task " << task.task_id()

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 165a16d..8977d8e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -193,8 +193,15 @@ message RunTaskMessage {
   // TODO(karya): Remove framework_id after MESOS-2559 has shipped.
   optional FrameworkID framework_id = 1 [deprecated = true];
   required FrameworkInfo framework = 2;
-  required string pid = 3;
   required TaskInfo task = 4;
+
+  // The pid of the framework. This was moved to 'optional' in
+  // 0.24.0 to support schedulers using the HTTP API. For now, we
+  // continue to always set pid since it was required in 0.23.x.
+  // When 'pid' is unset, or set to empty string, the slave will
+  // forward executor messages through the master. For schedulers
+  // still using the driver, this will remain set.
+  optional string pid = 3;
 }
 
 
@@ -335,7 +342,9 @@ message ShutdownExecutorMessage {
 
 message UpdateFrameworkMessage {
   required FrameworkID framework_id = 1;
-  required string pid = 2;
+
+  // See the comment on RunTaskMessage.pid.
+  optional string pid = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 784fdc8..4ba95f9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1165,13 +1165,16 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
       VLOG(1) << "Reregistering completed framework "
                 << completedFramework->id();
+
       Archive::Framework* completedFramework_ =
         message.add_completed_frameworks();
-      FrameworkInfo* frameworkInfo =
-        completedFramework_->mutable_framework_info();
-      frameworkInfo->CopyFrom(completedFramework->info);
 
-      completedFramework_->set_pid(completedFramework->pid);
+      completedFramework_->mutable_framework_info()->CopyFrom(
+          completedFramework->info);
+
+      if (completedFramework->pid.isSome()) {
+        completedFramework_->set_pid(completedFramework->pid.get());
+      }
 
       foreach (const Owned<Executor>& executor,
                completedFramework->completedExecutors) {
@@ -1179,10 +1182,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
                 << " with " << executor->terminatedTasks.size()
                 << " terminated tasks, " << executor->completedTasks.size()
                 << " completed tasks";
+
         foreach (const Task* task, executor->terminatedTasks.values()) {
           VLOG(2) << "Reregistering terminated task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
         }
+
         foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
           VLOG(2) << "Reregistering completed task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
@@ -1222,7 +1227,7 @@ void Slave::runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo_,
     const FrameworkID& frameworkId_,
-    const string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   if (master != from) {
@@ -1291,7 +1296,13 @@ void Slave::runTask(
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
 
-    framework = new Framework(this, frameworkInfo, pid);
+    Option<UPID> frameworkPid = None();
+
+    if (pid != UPID()) {
+      frameworkPid = pid;
+    }
+
+    framework = new Framework(this, frameworkInfo, frameworkPid);
     frameworks[frameworkId] = framework;
 
     // Is this same framework in completedFrameworks? If so, move the completed
@@ -1340,14 +1351,13 @@ void Slave::runTask(
 
   // Run the task after the unschedules are done.
   unschedule.onAny(
-      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
+      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
 }
 
 
 void Slave::_runTask(
     const Future<bool>& future,
     const FrameworkInfo& frameworkInfo,
-    const string& pid,
     const TaskInfo& task)
 {
   const FrameworkID frameworkId = frameworkInfo.id();
@@ -1733,8 +1743,12 @@ void Slave::runTasks(
     RunTaskMessage message;
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_framework()->MergeFrom(framework->info);
-    message.set_pid(framework->pid);
     message.mutable_task()->MergeFrom(task);
+
+    // Note that 0.23.x executors require the 'pid' to be set
+    // to decode the message, but do not use the field.
+    message.set_pid(framework->pid.getOrElse(UPID()));
+
     send(executor->pid, message);
   }
 }
@@ -2087,7 +2101,9 @@ void Slave::schedulerMessage(
 }
 
 
-void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
+void Slave::updateFramework(
+    const FrameworkID& frameworkId,
+    const UPID& pid)
 {
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
@@ -2115,15 +2131,25 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
     case Framework::RUNNING: {
       LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
 
-      framework->pid = pid;
+      if (pid == UPID()) {
+        framework->pid = None();
+      } else {
+        framework->pid = pid;
+      }
+
       if (framework->info.checkpoint()) {
-        // Checkpoint the framework pid.
+        // Checkpoint the framework pid, note that when the 'pid'
+        // is None, we checkpoint a default UPID() because
+        // 0.23.x slaves consider a missing pid file to be an
+        // error.
         const string path = paths::getFrameworkPidPath(
             metaDir, info.id(), frameworkId);
 
-        VLOG(1) << "Checkpointing framework pid '"
-                << framework->pid << "' to '" << path << "'";
-        CHECK_SOME(state::checkpoint(path, framework->pid));
+        VLOG(1) << "Checkpointing framework pid"
+                << " '" << framework->pid.getOrElse(UPID()) << "'"
+                << " to '" << path << "'";
+
+        CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
       }
 
       // Inform status update manager to immediately resend any pending
@@ -2989,15 +3015,23 @@ void Slave::executorMessage(
     return;
   }
 
-  LOG(INFO) << "Sending message for framework " << frameworkId
-            << " to " << framework->pid;
-
   ExecutorToFrameworkMessage message;
   message.mutable_slave_id()->MergeFrom(slaveId);
   message.mutable_framework_id()->MergeFrom(frameworkId);
   message.mutable_executor_id()->MergeFrom(executorId);
   message.set_data(data);
-  send(framework->pid, message);
+
+  CHECK_SOME(master);
+
+  if (framework->pid.isSome()) {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " to " << framework->pid.get();
+    send(framework->pid.get(), message);
+  } else {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " through the master " << master.get();
+    send(master.get(), message);
+  }
 
   metrics.valid_framework_messages++;
 }
@@ -4142,8 +4176,17 @@ void Slave::recoverFramework(const FrameworkState& state)
     CHECK_EQ(frameworkInfo.id(), state.id);
   }
 
+  // In 0.24.0, HTTP schedulers are supported and these do not
+  // have a 'pid'. In this case, the slave will checkpoint UPID().
   CHECK_SOME(state.pid);
-  Framework* framework = new Framework(this, frameworkInfo, state.pid.get());
+
+  Option<UPID> pid = state.pid.get();
+
+  if (pid.get() == UPID()) {
+    pid = None();
+  }
+
+  Framework* framework = new Framework(this, frameworkInfo, pid);
   frameworks[framework->id()] = framework;
 
   // Now recover the executors for this framework.
@@ -4662,7 +4705,7 @@ double Slave::_resources_revocable_percent(const string& name)
 Framework::Framework(
     Slave* _slave,
     const FrameworkInfo& _info,
-    const UPID& _pid)
+    const Option<UPID>& _pid)
   : state(RUNNING),
     slave(_slave),
     info(_info),
@@ -4675,15 +4718,21 @@ Framework::Framework(
         slave->metaDir, slave->info.id(), id());
 
     VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'";
+
     CHECK_SOME(state::checkpoint(path, info));
 
-    // Checkpoint the framework pid.
+    // Checkpoint the framework pid, note that we checkpoint a
+    // UPID() when it is None (for HTTP schedulers) because
+    // 0.23.x slaves consider a missing pid file to be an
+    // error.
     path = paths::getFrameworkPidPath(
         slave->metaDir, slave->info.id(), id());
 
-    VLOG(1) << "Checkpointing framework pid '"
-            << pid << "' to '" << path << "'";
-    CHECK_SOME(state::checkpoint(path, pid));
+    VLOG(1) << "Checkpointing framework pid"
+            << " '" << pid.getOrElse(UPID()) << "'"
+            << " to '" << path << "'";
+
+    CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID())));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dec4ca8..41d0949 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -117,14 +117,13 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
   // Made 'virtual' for Slave mocking.
   virtual void _runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   process::Future<bool> unschedule(const std::string& path);
@@ -150,7 +149,9 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
-  void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
+  void updateFramework(
+      const FrameworkID& frameworkId,
+      const process::UPID& pid);
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
@@ -634,7 +635,7 @@ struct Framework
   Framework(
       Slave* slave,
       const FrameworkInfo& info,
-      const process::UPID& pid);
+      const Option<process::UPID>& pid);
 
   ~Framework();
 
@@ -660,7 +661,12 @@ struct Framework
 
   const FrameworkInfo info;
 
-  UPID pid;
+  // Frameworks using the scheduler driver will have a 'pid',
+  // which allows us to send executor messages directly to the
+  // driver. Frameworks using the HTTP API (in 0.24.0) will
+  // not have a 'pid', in which case executor messages are
+  // sent through the master.
+  Option<UPID> pid;
 
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 4e00468..cecf200 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -248,7 +248,11 @@ struct FrameworkState
 
   FrameworkID id;
   Option<FrameworkInfo> info;
+
+  // Note that HTTP frameworks (supported in 0.24.0) do not have a
+  // PID, in which case 'pid' is Some(UPID()) rather than None().
   Option<process::UPID> pid;
+
   hashmap<ExecutorID, ExecutorState> executors;
   unsigned int errors;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index f09ef0f..f3b7315 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -462,7 +462,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _runTask(_, _, _, _))
+  EXPECT_CALL(*this, _runTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
   EXPECT_CALL(*this, killTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -485,7 +485,7 @@ void MockSlave::unmocked_runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const FrameworkID& frameworkId,
-    const std::string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
@@ -495,10 +495,9 @@ void MockSlave::unmocked_runTask(
 void MockSlave::unmocked__runTask(
       const Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task)
 {
-  slave::Slave::_runTask(future, frameworkInfo, pid, task);
+  slave::Slave::_runTask(future, frameworkInfo, task);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8a76b4f..1759d7e 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -811,26 +811,24 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task));
 
   void unmocked_runTask(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
-  MOCK_METHOD4(_runTask, void(
+  MOCK_METHOD3(_runTask, void(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task));
 
   void unmocked__runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   MOCK_METHOD3(killTask, void(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b145d76..64cef6e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1793,7 +1793,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _runTask;
-  EXPECT_CALL(slave, _runTask(_, _, _, _))
+  EXPECT_CALL(slave, _runTask(_, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_runTask),
                     SaveArg<0>(&future),
                     SaveArg<1>(&frameworkInfo)));
@@ -1818,8 +1818,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   driver.killTask(task.task_id());
 
   AWAIT_READY(killTask);
-  slave.unmocked__runTask(
-      future, frameworkInfo, master.get(), task);
+  slave.unmocked__runTask(future, frameworkInfo, task);
 
   AWAIT_READY(removeFramework);