You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/07/28 21:58:38 UTC

mesos git commit: Updated Framework struct in master for the http api.

Repository: mesos
Updated Branches:
  refs/heads/master 4b4cba24d -> 90b107a24


Updated Framework struct in master for the http api.

This change refactors the Framework struct in master to introduce
support for http frameworks:
  * 'pid' becomes a optional field.
  * Added optional 'http' field.

Review: https://reviews.apache.org/r/36318


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/90b107a2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/90b107a2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/90b107a2

Branch: refs/heads/master
Commit: 90b107a249169c6fc8b8d398b675ab9bd2df633b
Parents: 4b4cba2
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jul 28 11:53:45 2015 -0700
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Jul 28 12:19:31 2015 -0700

----------------------------------------------------------------------
 src/common/http.hpp   |   8 ++
 src/master/http.cpp   |   6 +-
 src/master/master.cpp | 183 +++++++++++++++++++++++++++------------------
 src/master/master.hpp | 105 ++++++++++++++++++++++++--
 4 files changed, 221 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/common/http.hpp
----------------------------------------------------------------------
diff --git a/src/common/http.hpp b/src/common/http.hpp
index 765860f..9e4290f 100644
--- a/src/common/http.hpp
+++ b/src/common/http.hpp
@@ -38,6 +38,14 @@ class Task;
 extern const char APPLICATION_JSON[];
 extern const char APPLICATION_PROTOBUF[];
 
+// Possible content-types that can be used as responses for
+// the mesos Http API.
+enum class ContentType
+{
+  PROTOBUF,
+  JSON
+};
+
 JSON::Object model(const Resources& resources);
 JSON::Object model(const hashmap<std::string, Resources>& roleResources);
 JSON::Object model(const Attributes& attributes);

http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 3a1598f..3772e39 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -113,7 +113,11 @@ JSON::Object summarize(const Framework& framework)
   JSON::Object object;
   object.values["id"] = framework.id().value();
   object.values["name"] = framework.info.name();
-  object.values["pid"] = string(framework.pid);
+
+  // Omit pid for http frameworks.
+  if (framework.pid.isSome()) {
+    object.values["pid"] = string(framework.pid.get());
+  }
 
   // TODO(bmahler): Use these in the webui.
   object.values["used_resources"] = model(framework.totalUsedResources);

http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a8a195d..3e63184 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1659,7 +1659,7 @@ void Master::receive(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     drop(from, call, "Call is not from registered framework");
     return;
   }
@@ -1844,41 +1844,41 @@ void Master::_registerFramework(
       FrameworkRegisteredMessage message;
       message.mutable_framework_id()->MergeFrom(framework->id());
       message.mutable_master_info()->MergeFrom(info_);
-      send(from, message);
+      framework->send(message);
       return;
     }
   }
 
+  // TODO(vinod): Deprecate this in favor of authorization.
+  bool rootSubmissions = flags.root_submissions;
+
+  if (frameworkInfo.user() == "root" && rootSubmissions == false) {
+    LOG(INFO) << "Framework " << frameworkInfo.name() << " at " << from
+              << " registering as root, but root submissions are disabled"
+              << " on this cluster";
+    FrameworkErrorMessage message;
+    message.set_message("User 'root' is not allowed to run frameworks");
+    send(from, message);
+    return;
+  }
+
   // Assign a new FrameworkID.
   FrameworkInfo frameworkInfo_ = frameworkInfo;
   frameworkInfo_.mutable_id()->CopyFrom(newFrameworkId());
 
-  Framework* framework = new Framework(frameworkInfo_, from);
+  Framework* framework = new Framework(this, frameworkInfo_, from);
 
   LOG(INFO) << "Registering framework " << *framework
             << " with checkpointing "
             << (framework->info.checkpoint() ? "enabled" : "disabled")
             << " and capabilities " << framework->info.capabilities();
 
-  // TODO(vinod): Deprecate this in favor of authorization.
-  bool rootSubmissions = flags.root_submissions;
-
-  if (framework->info.user() == "root" && rootSubmissions == false) {
-    LOG(INFO) << "Framework " << *framework << " registering as root, but "
-              << "root submissions are disabled on this cluster";
-    FrameworkErrorMessage message;
-    message.set_message("User 'root' is not allowed to run frameworks");
-    send(from, message);
-    delete framework;
-    return;
-  }
-
   addFramework(framework);
 
   FrameworkRegisteredMessage message;
   message.mutable_framework_id()->MergeFrom(framework->id());
   message.mutable_master_info()->MergeFrom(info_);
-  send(framework->pid, message);
+  framework->send(message);
 }
 
 
@@ -1952,6 +1952,7 @@ void Master::_reregisterFramework(
     const Future<Option<Error>>& validationError)
 {
   CHECK_READY(validationError);
+
   if (validationError.get().isSome()) {
     LOG(INFO) << "Refusing re-registration of framework " << frameworkInfo.id()
               << " (" << frameworkInfo.name() << ") " << " at " << from
@@ -2025,7 +2026,7 @@ void Master::_reregisterFramework(
       // info?
       LOG(INFO) << "Framework " << *framework << " failed over";
       failoverFramework(framework, from);
-    } else if (from != framework->pid) {
+    } else if (framework->pid != from) {
       LOG(ERROR)
         << "Disallowing re-registration attempt of framework " << *framework
         << " because it is not expected from " << from;
@@ -2063,7 +2064,7 @@ void Master::_reregisterFramework(
       FrameworkReregisteredMessage message;
       message.mutable_framework_id()->MergeFrom(frameworkInfo.id());
       message.mutable_master_info()->MergeFrom(info_);
-      send(from, message);
+      framework->send(message);
       return;
     }
   } else {
@@ -2071,7 +2072,7 @@ void Master::_reregisterFramework(
     // elected Mesos master to which either an existing scheduler or a
     // failed-over one is connecting. Create a Framework object and add
     // any tasks it has that have been reported by reconnecting slaves.
-    Framework* framework = new Framework(frameworkInfo, from);
+    Framework* framework = new Framework(this, frameworkInfo, from);
 
     // TODO(benh): Check for root submissions like above!
 
@@ -2098,7 +2099,7 @@ void Master::_reregisterFramework(
     FrameworkRegisteredMessage message;
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_master_info()->MergeFrom(info_);
-    send(framework->pid, message);
+    framework->send(message);
   }
 
   CHECK(frameworks.registered.contains(frameworkInfo.id()))
@@ -2172,7 +2173,7 @@ void Master::deactivateFramework(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring deactivate framework message for framework " << *framework
       << " because it is not expected from " << from;
@@ -2191,9 +2192,17 @@ void Master::disconnect(Framework* framework)
 
   framework->connected = false;
 
-  // Remove the framework from authenticated. This is safe because
-  // a framework will always reauthenticate before (re-)registering.
-  authenticated.erase(framework->pid);
+  if (framework->pid.isSome()) {
+    // Remove the framework from authenticated. This is safe because
+    // a framework will always reauthenticate before (re-)registering.
+    authenticated.erase(framework->pid.get());
+  } else {
+    CHECK_SOME(framework->http);
+
+    // Close the HTTP connection, which may already have
+    // been closed due to scheduler disconnection.
+    framework->http.get().writer.close();
+  }
 
   deactivate(framework);
 }
@@ -2275,7 +2284,7 @@ void Master::resourceRequest(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring resource request message from framework " << *framework
       << " because it is not expected from " << from;
@@ -2329,12 +2338,11 @@ void Master::launchTasks(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring launch tasks message for offers " << stringify(offerIds)
-      << " of framework " << frameworkId << " from '" << from
-      << "' because it is not from the registered framework '"
-      << framework->pid << "'";
+      << " from '" << from << "' because it is not from the"
+      << " registered framework " << *framework;
 
     return;
   }
@@ -2882,7 +2890,11 @@ void Master::_accept(
             RunTaskMessage message;
             message.mutable_framework()->MergeFrom(framework->info);
             message.mutable_framework_id()->MergeFrom(framework->id());
-            message.set_pid(framework->pid);
+
+            // TODO(anand): We set 'pid' to UPID() for http frameworks
+            // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+            // no longer have to set pid here for http frameowrks.
+            message.set_pid(framework->pid.getOrElse(UPID()));
             message.mutable_task()->MergeFrom(task_);
 
             if (HookManager::hooksAvailable()) {
@@ -2958,7 +2970,7 @@ void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring revive offers message for framework " << *framework
       << " because it is not expected from " << from;
@@ -2997,7 +3009,7 @@ void Master::killTask(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring kill task message for task " << taskId << " of framework "
       << *framework << " because it is not expected from " << from;
@@ -3117,7 +3129,7 @@ void Master::statusUpdateAcknowledgement(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid)
       << " for task " << taskId << " of framework " << *framework
@@ -3232,11 +3244,11 @@ void Master::schedulerMessage(
     return;
   }
 
-  if (from != framework->pid) {
-    LOG(WARNING) << "Ignoring framework message"
-                 << " for executor '" << executorId << "'"
-                 << " of framework " << *framework
-                 << " because it is not expected from " << from;
+  if (framework->pid != from) {
+    LOG(WARNING)
+      << "Ignoring framework message for executor " << executorId
+      << " of framework " << *framework
+      << " because it is not expected from " << from;
     metrics->invalid_framework_to_executor_messages++;
     return;
   }
@@ -3307,7 +3319,8 @@ void Master::executorMessage(
   message.mutable_framework_id()->MergeFrom(frameworkId);
   message.mutable_executor_id()->MergeFrom(executorId);
   message.set_data(data);
-  send(framework->pid, message);
+
+  framework->send(message);
 
   metrics->valid_executor_to_framework_messages++;
 }
@@ -3712,16 +3725,23 @@ void Master::__reregisterSlave(Slave* slave, const vector<Task>& tasks)
   CHECK_NOTNULL(slave);
 
   // Send the latest framework pids to the slave.
-  hashset<UPID> pids;
+  hashset<FrameworkID> ids;
+
   foreach (const Task& task, tasks) {
     Framework* framework = getFramework(task.framework_id());
-    if (framework != NULL && !pids.contains(framework->pid)) {
+
+    if (framework != NULL && !ids.contains(framework->id())) {
       UpdateFrameworkMessage message;
       message.mutable_framework_id()->MergeFrom(framework->id());
-      message.set_pid(framework->pid);
+
+      // TODO(anand): We set 'pid' to UPID() for http frameworks
+      // as 'pid' was made optional in 0.24.0. In 0.25.0, we
+      // no longer have to set pid here for http frameowrks.
+      message.set_pid(framework->pid.getOrElse(UPID()));
+
       send(slave->pid, message);
 
-      pids.insert(framework->pid);
+      ids.insert(framework->id());
     }
   }
 
@@ -3905,7 +3925,7 @@ void Master::forward(
   StatusUpdateMessage message;
   message.mutable_update()->MergeFrom(update);
   message.set_pid(acknowledgee);
-  send(framework->pid, message);
+  framework->send(message);
 }
 
 
@@ -3977,7 +3997,7 @@ void Master::exitedExecutor(
   message.mutable_slave_id()->CopyFrom(slaveId);
   message.set_status(status);
 
-  send(framework->pid, message);
+  framework->send(message);
 }
 
 
@@ -4064,7 +4084,7 @@ void Master::reconcileTasks(
     return;
   }
 
-  if (from != framework->pid) {
+  if (framework->pid != from) {
     LOG(WARNING)
       << "Ignoring reconcile tasks message for framework " << *framework
       << " because it is not expected from " << from;
@@ -4106,7 +4126,7 @@ void Master::_reconcileTasks(
       // much logging.
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
-      send(framework->pid, message);
+      framework->send(message);
     }
 
     foreachvalue (Task* task, framework->tasks) {
@@ -4139,7 +4159,7 @@ void Master::_reconcileTasks(
       // much logging.
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update);
-      send(framework->pid, message);
+      framework->send(message);
     }
 
     return;
@@ -4243,7 +4263,7 @@ void Master::_reconcileTasks(
       // much logging.
       StatusUpdateMessage message;
       message.mutable_update()->CopyFrom(update.get());
-      send(framework->pid, message);
+      framework->send(message);
     }
   }
 }
@@ -4405,7 +4425,7 @@ void Master::offer(const FrameworkID& frameworkId,
   LOG(INFO) << "Sending " << message.offers().size()
             << " offers to framework " << *framework;
 
-  send(framework->pid, message);
+  framework->send(message);
 }
 
 
@@ -4718,9 +4738,15 @@ void Master::addFramework(Framework* framework)
   CHECK(!frameworks.registered.contains(framework->id()))
     << "Framework " << *framework << " already exists!";
 
+  CHECK_SOME(framework->pid) << "adding http framework not implemented";
+
   frameworks.registered[framework->id()] = framework;
 
-  link(framework->pid);
+  link(framework->pid.get());
+
+  // TODO(anand): For http frameworks, add a readerClosed()
+  // callback to invoke Master::exited() when the connection
+  // closes.
 
   // Enforced by Master::registerFramework.
   CHECK(roles.contains(framework->info.role()))
@@ -4742,13 +4768,13 @@ void Master::addFramework(Framework* framework)
   // If the framework is authenticated, its principal should be in
   // 'authenticated'. Otherwise look if it's supplied in
   // FrameworkInfo.
-  Option<string> principal = authenticated.get(framework->pid);
+  Option<string> principal = authenticated.get(framework->pid.get());
   if (principal.isNone() && framework->info.has_principal()) {
     principal = framework->info.principal();
   }
 
-  CHECK(!frameworks.principals.contains(framework->pid));
-  frameworks.principals.put(framework->pid, principal);
+  CHECK(!frameworks.principals.contains(framework->pid.get()));
+  frameworks.principals.put(framework->pid.get(), principal);
 
   // Export framework metrics if a principal is specified.
   if (principal.isSome()) {
@@ -4767,7 +4793,9 @@ void Master::addFramework(Framework* framework)
 // event of a scheduler failover.
 void Master::failoverFramework(Framework* framework, const UPID& newPid)
 {
-  const UPID oldPid = framework->pid;
+  CHECK_SOME(framework->pid) << "http framework failover not implemented";
+
+  const UPID oldPid = framework->pid.get();
 
   // There are a few failover cases to consider:
   //   1. The pid has changed. In this case we definitely want to
@@ -4795,7 +4823,7 @@ void Master::failoverFramework(Framework* framework, const UPID& newPid)
   FrameworkRegisteredMessage message;
   message.mutable_framework_id()->MergeFrom(framework->id());
   message.mutable_master_info()->MergeFrom(info_);
-  send(newPid, message);
+  framework->send(message);
 
   // Remove the framework's offers (if they weren't removed before).
   // We do this after we have updated the pid and sent the framework
@@ -4912,6 +4940,8 @@ void Master::removeFramework(Framework* framework)
 
   // TODO(benh): unlink(framework->pid);
 
+  // TODO(anand): For http frameworks, close the connection.
+
   framework->unregisteredTime = Clock::now();
 
   // The completedFramework buffer now owns the framework pointer.
@@ -4923,21 +4953,26 @@ void Master::removeFramework(Framework* framework)
 
   roles[framework->info.role()]->removeFramework(framework);
 
-  // Remove the framework from authenticated.
-  authenticated.erase(framework->pid);
-
-  CHECK(frameworks.principals.contains(framework->pid));
-  const Option<string> principal = frameworks.principals[framework->pid];
-
-  frameworks.principals.erase(framework->pid);
-
-  // Remove the framework's message counters.
-  if (principal.isSome()) {
-    // Remove the metrics for the principal if this framework is the
-    // last one with this principal.
-    if (!frameworks.principals.containsValue(principal.get())) {
-      CHECK(metrics->frameworks.contains(principal.get()));
-      metrics->frameworks.erase(principal.get());
+  // TODO(anand): This only works for pid based frameworks. We would
+  // need similar authentication logic for http frameworks.
+  if (framework->pid.isSome()) {
+    // Remove the framework from authenticated.
+    authenticated.erase(framework->pid.get());
+
+    CHECK(frameworks.principals.contains(framework->pid.get()));
+    const Option<string> principal =
+      frameworks.principals[framework->pid.get()];
+
+    frameworks.principals.erase(framework->pid.get());
+
+    // Remove the framework's message counters.
+    if (principal.isSome()) {
+      // Remove the metrics for the principal if this framework is the
+      // last one with this principal.
+      if (!frameworks.principals.containsValue(principal.get())) {
+        CHECK(metrics->frameworks.contains(principal.get()));
+        metrics->frameworks.erase(principal.get());
+      }
     }
   }
 
@@ -5220,7 +5255,7 @@ void Master::_removeSlave(
               << "after recovering";
     LostSlaveMessage message;
     message.mutable_slave_id()->MergeFrom(slaveInfo.id());
-    send(framework->pid, message);
+    framework->send(message);
   }
 }
 
@@ -5465,7 +5500,7 @@ void Master::removeOffer(Offer* offer, bool rescind)
   if (rescind) {
     RescindResourceOfferMessage message;
     message.mutable_offer_id()->MergeFrom(offer->id());
-    send(framework->pid, message);
+    framework->send(message);
   }
 
   // Remove and cancel offer removal timers. Canceling the Timers is

http://git-wip-us.apache.org/repos/asf/mesos/blob/90b107a2/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2c924ad..879e3d8 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -52,7 +52,9 @@
 #include <stout/hashset.hpp>
 #include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
+#include <stout/recordio.hpp>
 
+#include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
 #include "common/resources_utils.hpp"
 
@@ -830,6 +832,7 @@ private:
   Master(const Master&);              // No copying.
   Master& operator = (const Master&); // No assigning.
 
+  friend struct Framework;
   friend struct Metrics;
 
   // NOTE: Since 'getOffer' and 'slaves' are protected,
@@ -1211,15 +1214,22 @@ private:
 };
 
 
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const Framework& framework);
+
+
 // Information about a connected or completed framework.
 // TODO(bmahler): Keeping the task and executor information in sync
 // across the Slave and Framework structs is error prone!
 struct Framework
 {
-  Framework(const FrameworkInfo& _info,
+  Framework(Master* const _master,
+            const FrameworkInfo& _info,
             const process::UPID& _pid,
             const process::Time& time = process::Clock::now())
-    : info(_info),
+    : master(_master),
+      info(_info),
       pid(_pid),
       connected(true),
       active(true),
@@ -1227,7 +1237,49 @@ struct Framework
       reregisteredTime(time),
       completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK) {}
 
-  ~Framework() {}
+  Framework(Master* const _master,
+            const FrameworkInfo& _info,
+            const process::http::Pipe::Writer& writer,
+            ContentType contentType,
+            const process::Time& time = process::Clock::now())
+    : master(_master),
+      info(_info),
+      connected(true),
+      active(true),
+      registeredTime(time),
+      reregisteredTime(time),
+      completedTasks(MAX_COMPLETED_TASKS_PER_FRAMEWORK)
+  {
+    // TODO(anand): This logic needs to be invoked each
+    // time the framework connects via http. Move it to
+    // a method instead, that gets invoked from
+    // addFramework and failoverFrameowrk.
+
+    auto serialize = [contentType](const scheduler::Event& event) {
+      switch (contentType) {
+        case ContentType::PROTOBUF: {
+          return event.SerializeAsString();
+        }
+        case ContentType::JSON: {
+          JSON::Object object = JSON::Protobuf(event);
+          return stringify(object);
+        }
+      }
+    };
+
+    auto encoder = recordio::Encoder<scheduler::Event>(serialize);
+
+    http = Http {writer, encoder};
+  }
+
+  ~Framework()
+  {
+    if (http.isSome() && connected) {
+      if (!http.get().writer.close()) {
+        LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
+      }
+    }
+  }
 
   Task* getTask(const TaskID& taskId)
   {
@@ -1270,6 +1322,29 @@ struct Framework
     }
   }
 
+  // Sends a message to the connected framework.
+  template <typename Message>
+  void send(const Message& message)
+  {
+    if (!connected) {
+      LOG(WARNING) << "Master attempted to send message to disconnected"
+                   << " framework " << *this;
+      return;
+    }
+
+    if (http.isSome()) {
+      const scheduler::Event event = protobuf::scheduler::event(message);
+
+      if (!http.get().writer.write(http.get().encoder.encode(event))) {
+        LOG(WARNING) << "Unable to send event to framework " << *this << ":"
+                     << " connection closed";
+      }
+    } else {
+      CHECK_SOME(pid);
+      master->send(pid.get(), message);
+    }
+  }
+
   void addCompletedTask(const Task& task)
   {
     // TODO(adam-mesos): Check if completed task already exists.
@@ -1420,9 +1495,22 @@ struct Framework
     }
   }
 
+  Master* const master;
+
   FrameworkInfo info;
 
-  process::UPID pid;
+  struct Http
+  {
+    process::http::Pipe::Writer writer;
+    recordio::Encoder<scheduler::Event> encoder;
+  };
+
+  // Frameworks can either be connected via HTTP or by message
+  // passing (scheduler driver). Exactly one of 'http' and 'pid'
+  // will be set according to the last connection made by the
+  // framework.
+  Option<Http> http;
+  Option<process::UPID> pid;
 
   // Framework becomes disconnected when the socket closes.
   bool connected;
@@ -1493,8 +1581,13 @@ inline std::ostream& operator << (
 {
   // TODO(vinod): Also log the hostname once FrameworkInfo is properly
   // updated on framework failover (MESOS-1784).
-  return stream << framework.id() << " (" << framework.info.name()
-                << ") at " << framework.pid;
+  stream << framework.id() << " (" << framework.info.name() << ")";
+
+  if (framework.pid.isSome()) {
+    stream << " at " << framework.pid.get();
+  }
+
+  return stream;
 }