You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2019/01/30 05:17:58 UTC
[mesos] branch master updated: Fixed scheduler library on multiple
SUBSCRIBE requests per connection.
This is an automated email from the ASF dual-hosted git repository.
tillt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new a5b9fca Fixed scheduler library on multiple SUBSCRIBE requests per connection.
a5b9fca is described below
commit a5b9fcafbdf8663707e19c818be8a2da1eff8622
Author: Till Toenshoff <to...@me.com>
AuthorDate: Wed Jan 30 05:37:17 2019 +0100
Fixed scheduler library on multiple SUBSCRIBE requests per connection.
The HTTP scheduler API dictates that on a single connection, the
scheduler may only send a single SUBSCRIBE request. Due to recent
authentication related changes, this contract got broken. This patch
restores the contract and adds a test validating that the library is
enforcing it.
Review: https://reviews.apache.org/r/69839/
---
src/scheduler/scheduler.cpp | 40 +++++++++++++++++----------------
src/tests/scheduler_tests.cpp | 52 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 73 insertions(+), 19 deletions(-)
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index cb24ba9..674483a 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -231,22 +231,6 @@ public:
return;
}
- if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
- // It might be possible that the scheduler is retrying. We drop the
- // request if we have an ongoing subscribe request in flight or if the
- // scheduler is already subscribed.
- drop(call, "Scheduler is in state " + stringify(state));
- return;
- }
-
- if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
- // We drop all non-subscribe calls if we are not currently subscribed.
- drop(call, "Scheduler is in state " + stringify(state));
- return;
- }
-
- VLOG(1) << "Sending " << call.type() << " call to " << master.get();
-
// TODO(vinod): Add support for sending MESSAGE calls directly
// to the slave, instead of relaying it through the master, as
// the scheduler driver does.
@@ -259,6 +243,9 @@ public:
request.headers = {{"Accept", stringify(contentType)},
{"Content-Type", stringify(contentType)}};
+ VLOG(1) << "Adding authentication headers to " << call.type() << " call to "
+ << master.get();
+
// TODO(tillt): Add support for multi-step authentication protocols.
authenticatee->authenticate(request, credential)
.onAny(defer(self(), &Self::_send, call, lambda::_1));
@@ -584,9 +571,22 @@ protected:
void _send(const Call& call, const Future<process::http::Request>& future)
{
if (!future.isReady()) {
- LOG(ERROR) << "HTTP authenticatee "
- << (future.isFailed() ? "failed: " + future.failure()
- : "discarded");
+ LOG(ERROR) << "HTTP authenticatee failed while adding authentication"
+ << " header to request: " << future;
+ return;
+ }
+
+ if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
+ // It might be possible that the scheduler is retrying. We drop the
+ // request if we have an ongoing subscribe request in flight or if the
+ // scheduler is already subscribed.
+ drop(call, "Scheduler is in state " + stringify(state));
+ return;
+ }
+
+ if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
+ // We drop all non-subscribe calls if we are not currently subscribed.
+ drop(call, "Scheduler is in state " + stringify(state));
return;
}
@@ -597,6 +597,8 @@ protected:
return;
}
+ VLOG(1) << "Sending " << call.type() << " call to " << master.get();
+
Future<process::http::Response> response;
if (call.type() == Call::SUBSCRIBE) {
state = SUBSCRIBING;
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index b571bb1..5fb6960 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -162,6 +162,58 @@ TEST_P(SchedulerTest, Subscribe)
}
+// Test validates that the scheduler library will not allow multiple
+// SUBSCRIBE requests over the same connection.
+TEST_P(SchedulerTest, SubscribeDrop)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected));
+
+ ContentType contentType = GetParam();
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ contentType,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Nothing> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureSatisfy(&subscribed));
+
+ Future<Nothing> heartbeat;
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillOnce(FutureSatisfy(&heartbeat));
+
+ Clock::pause();
+
+ mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ // Send another SUBSCRIBE request. This one should get dropped as we
+ // already have a SUBSCRIBE in flight on that same connection.
+
+ mesos.send(v1::createCallSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ AWAIT_READY(subscribed);
+ AWAIT_READY(heartbeat);
+
+ Clock::resume();
+
+ {
+ JSON::Object metrics = Metrics();
+
+ EXPECT_EQ(1u, metrics.values["master/messages_register_framework"]);
+ }
+}
+
+
// This test verifies that a scheduler can subscribe with the master after
// failing over to another instance.
TEST_P(SchedulerTest, SchedulerFailover)