You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/28 18:55:21 UTC

[03/10] mesos git commit: Cleaned up the previous queueing Calls logic.

Cleaned up the previous queueing Calls logic.

This change cleans up the previous queueing calls logic in favor of HTTP
connection pipelining in the scheduler library. This review clears up the noise
of the pipelining diff in subsequent reviews.

Review: https://reviews.apache.org/r/43659/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5651e441
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5651e441
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5651e441

Branch: refs/heads/master
Commit: 5651e44159aea6e737da4122b455e54a7995b3c6
Parents: bf5439a
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sun Feb 28 09:51:36 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sun Feb 28 09:51:36 2016 -0800

----------------------------------------------------------------------
 src/scheduler/scheduler.cpp | 114 ++++++++++++++-------------------------
 1 file changed, 41 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5651e441/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 3109ac0..ec688d6 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -189,18 +189,50 @@ public:
 
   void send(const Call& call)
   {
-    // NOTE: We enqueue the calls to guarantee that a call is sent only after
-    // a response has been received for the previous call.
-    // TODO(vinod): Use HTTP pipelining instead.
-    calls.push(call);
+    if (master.isNone()) {
+      drop(call, "Disconnected");
+      return;
+    }
+
+    Option<Error> error = validation::scheduler::call::validate(devolve(call));
 
-    if (calls.size() > 1) {
+    if (error.isSome()) {
+      drop(call, error.get().message);
       return;
     }
 
-    // If this is the first in the queue send the call.
-    _send(call)
-      .onAny(defer(self(), &Self::___send));
+    VLOG(1) << "Sending " << call.type() << " call to " << master.get();
+
+    // TODO(vinod): Add support for sending MESSAGE calls directly
+    // to the slave, instead of relaying it through the master, as
+    // the scheduler driver does.
+
+    const string body = serialize(contentType, call);
+    const http::Headers headers{{"Accept", stringify(contentType)}};
+
+    Future<Response> response;
+
+    if (call.type() == Call::SUBSCRIBE) {
+      // Each subscription requires a new connection.
+      disconnect();
+
+      // Send a streaming request for Subscribe call.
+      response = process::http::streaming::post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    } else {
+      response = post(
+          master.get(),
+          "api/v1/scheduler",
+          headers,
+          body,
+          stringify(contentType));
+    }
+
+    response.onAny(defer(self(), &Self::_send, call, lambda::_1));
   }
 
 protected:
@@ -282,57 +314,7 @@ protected:
     LOG(WARNING) << "Dropping " << call.type() << ": " << message;
   }
 
-  Future<Nothing> _send(const Call& call)
-  {
-    if (master.isNone()) {
-      drop(call, "Disconnected");
-      return Nothing();
-    }
-
-    Option<Error> error = validation::scheduler::call::validate(devolve(call));
-
-    if (error.isSome()) {
-      drop(call, error.get().message);
-      return Nothing();
-    }
-
-    VLOG(1) << "Sending " << call.type() << " call to " << master.get();
-
-    // TODO(vinod): Add support for sending MESSAGE calls directly
-    // to the slave, instead of relaying it through the master, as
-    // the scheduler driver does.
-
-    const string body = serialize(contentType, call);
-    const http::Headers headers{{"Accept", stringify(contentType)}};
-
-    Future<Response> response;
-
-    if (call.type() == Call::SUBSCRIBE) {
-      // Each subscription requires a new connection.
-      disconnect();
-
-      // Send a streaming request for Subscribe call.
-      response = process::http::streaming::post(
-          master.get(),
-          "api/v1/scheduler",
-          headers,
-          body,
-          stringify(contentType));
-    } else {
-      response = post(
-          master.get(),
-          "api/v1/scheduler",
-          headers,
-          body,
-          stringify(contentType));
-    }
-
-    return response
-      .onAny(defer(self(), &Self::__send, call, lambda::_1))
-      .then([]() { return Nothing(); });
-  }
-
-  void __send(const Call& call, const Future<Response>& response)
+  void _send(const Call& call, const Future<Response>& response)
   {
     CHECK(!response.isDiscarded());
 
@@ -386,18 +368,6 @@ protected:
           response.get().body + ") for " + stringify(call.type()));
   }
 
-  void ___send()
-  {
-    CHECK_LT(0u, calls.size());
-    calls.pop();
-
-    // Execute the next event in the queue.
-    if (!calls.empty()) {
-      _send(calls.front())
-        .onAny(defer(self(), &Self::___send));
-    }
-  }
-
   void read()
   {
     connection.get().decoder->read()
@@ -505,8 +475,6 @@ private:
 
   queue<Event> events;
 
-  queue<Call> calls;
-
   Option<UPID> master;
 };