You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2019/01/30 05:17:58 UTC

[mesos] branch master updated: Fixed scheduler library on multiple SUBSCRIBE requests per connection.

This is an automated email from the ASF dual-hosted git repository.

tillt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new a5b9fca  Fixed scheduler library on multiple SUBSCRIBE requests per connection.
a5b9fca is described below

commit a5b9fcafbdf8663707e19c818be8a2da1eff8622
Author: Till Toenshoff <to...@me.com>
AuthorDate: Wed Jan 30 05:37:17 2019 +0100

    Fixed scheduler library on multiple SUBSCRIBE requests per connection.
    
    The HTTP scheduler API dictates that on a single connection, the
    scheduler may only send a single SUBSCRIBE request. Due to recent
    authentication related changes, this contract got broken. This patch
    restores the contract and adds a test validating that the library is
    enforcing it.
    
    Review: https://reviews.apache.org/r/69839/
---
 src/scheduler/scheduler.cpp   | 40 +++++++++++++++++----------------
 src/tests/scheduler_tests.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 73 insertions(+), 19 deletions(-)

diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index cb24ba9..674483a 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -231,22 +231,6 @@ public:
       return;
     }
 
-    if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
-      // It might be possible that the scheduler is retrying. We drop the
-      // request if we have an ongoing subscribe request in flight or if the
-      // scheduler is already subscribed.
-      drop(call, "Scheduler is in state " + stringify(state));
-      return;
-    }
-
-    if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
-      // We drop all non-subscribe calls if we are not currently subscribed.
-      drop(call, "Scheduler is in state " + stringify(state));
-      return;
-    }
-
-    VLOG(1) << "Sending " << call.type() << " call to " << master.get();
-
     // TODO(vinod): Add support for sending MESSAGE calls directly
     // to the slave, instead of relaying it through the master, as
     // the scheduler driver does.
@@ -259,6 +243,9 @@ public:
     request.headers = {{"Accept", stringify(contentType)},
                        {"Content-Type", stringify(contentType)}};
 
+    VLOG(1) << "Adding authentication headers to " << call.type() << " call to "
+            << master.get();
+
     // TODO(tillt): Add support for multi-step authentication protocols.
     authenticatee->authenticate(request, credential)
       .onAny(defer(self(), &Self::_send, call, lambda::_1));
@@ -584,9 +571,22 @@ protected:
   void _send(const Call& call, const Future<process::http::Request>& future)
   {
     if (!future.isReady()) {
-      LOG(ERROR) << "HTTP authenticatee "
-                 << (future.isFailed() ? "failed: " + future.failure()
-                                       : "discarded");
+      LOG(ERROR) << "HTTP authenticatee failed while adding authentication"
+                 << " header to request: " << future;
+      return;
+    }
+
+    if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
+      // It might be possible that the scheduler is retrying. We drop the
+      // request if we have an ongoing subscribe request in flight or if the
+      // scheduler is already subscribed.
+      drop(call, "Scheduler is in state " + stringify(state));
+      return;
+    }
+
+    if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
+      // We drop all non-subscribe calls if we are not currently subscribed.
+      drop(call, "Scheduler is in state " + stringify(state));
       return;
     }
 
@@ -597,6 +597,8 @@ protected:
       return;
     }
 
+    VLOG(1) << "Sending " << call.type() << " call to " << master.get();
+
     Future<process::http::Response> response;
     if (call.type() == Call::SUBSCRIBE) {
       state = SUBSCRIBING;
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index b571bb1..5fb6960 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -162,6 +162,58 @@ TEST_P(SchedulerTest, Subscribe)
 }
 
 
+// Test validates that the scheduler library will not allow multiple
+// SUBSCRIBE requests over the same connection.
+TEST_P(SchedulerTest, SubscribeDrop)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  ContentType contentType = GetParam();
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      contentType,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Nothing> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureSatisfy(&subscribed));
+
+  Future<Nothing> heartbeat;
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillOnce(FutureSatisfy(&heartbeat));
+
+  Clock::pause();
+
+  mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  // Send another SUBSCRIBE request. This one should get dropped as we
+  // already have a SUBSCRIBE in flight on that same connection.
+
+  mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  AWAIT_READY(subscribed);
+  AWAIT_READY(heartbeat);
+
+  Clock::resume();
+
+  {
+    JSON::Object metrics = Metrics();
+
+    EXPECT_EQ(1u, metrics.values["master/messages_register_framework"]);
+  }
+}
+
+
 // This test verifies that a scheduler can subscribe with the master after
 // failing over to another instance.
 TEST_P(SchedulerTest, SchedulerFailover)