You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/12/14 21:32:09 UTC

[mesos] 05/06: Changed master to hold subscribers in a circular buffer.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 605d8fffc229ab209ae0f4de88c5ecf4bb33c96e
Author: Joseph Wu <jo...@mesosphere.io>
AuthorDate: Thu Dec 13 16:35:03 2018 -0800

    Changed master to hold subscribers in a circular buffer.
    
    This adds a flag (--max_operator_event_stream_subscribers) to the
    master which controls how many active subscribers on the Master's
    event stream will be allowed at any time.
    
    The default is 1000 subscribers, which is purposefully higher
    than we expect is needed.  Operators aware that their  network
    has clients/proxies whom do not close connections have the
    option of lowering this flag.
    
    Review: https://reviews.apache.org/r/69307/
---
 docs/configuration/master.md |  16 +++++
 src/master/constants.hpp     |   4 ++
 src/master/flags.cpp         |  11 +++
 src/master/flags.hpp         |   1 +
 src/master/master.cpp        |  17 ++++-
 src/master/master.hpp        |   6 +-
 src/tests/api_tests.cpp      | 167 +++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 217 insertions(+), 5 deletions(-)

diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 5754767..83b83b0 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -444,6 +444,22 @@ Maximum number of completed tasks per framework to store in memory. (default: 10
   </td>
 </tr>
 
+<tr id="max_operator_event_stream_subscribers">
+  <td>
+    --max_operator_event_stream_subscribers=VALUE
+  </td>
+  <td>
+Maximum number of simultaneous subscribers to the master's operator event
+stream. If new connections bring the total number of subscribers over this
+value, older connections will be closed by the master.
+
+This flag should generally not be changed unless the operator is mitigating
+known problems with their network setup, such as clients/proxies that do not
+close connections to the master.
+(default: 1000)
+  </td>
+</tr>
+
 <tr id="max_unreachable_tasks_per_framework">
   <td>
     --max_unreachable_tasks_per_framework=VALUE
diff --git a/src/master/constants.hpp b/src/master/constants.hpp
index 76ad0c3..b0ab918 100644
--- a/src/master/constants.hpp
+++ b/src/master/constants.hpp
@@ -91,6 +91,10 @@ constexpr double RECOVERY_AGENT_REMOVAL_PERCENT_LIMIT = 1.0; // 100%.
 // Maximum number of removed slaves to store in the cache.
 constexpr size_t MAX_REMOVED_SLAVES = 100000;
 
+// Default maximum number of subscribers to the master's event stream
+// to keep active at any time.
+constexpr size_t DEFAULT_MAX_OPERATOR_EVENT_STREAM_SUBSCRIBERS = 1000;
+
 // Default maximum number of completed frameworks to store in the cache.
 constexpr size_t DEFAULT_MAX_COMPLETED_FRAMEWORKS = 50;
 
diff --git a/src/master/flags.cpp b/src/master/flags.cpp
index 2677738..f9b68bc 100644
--- a/src/master/flags.cpp
+++ b/src/master/flags.cpp
@@ -572,6 +572,17 @@ mesos::internal::master::Flags::Flags()
       "Currently there is no support for multiple HTTP framework\n"
       "authenticators.");
 
+  add(&Flags::max_operator_event_stream_subscribers,
+      "max_operator_event_stream_subscribers",
+      "Maximum number of simultaneous subscribers to the master's operator\n"
+      "event stream. If new connections bring the total number of subscribers\n"
+      "over this value, older connections will be closed by the master.\n"
+      "\n"
+      "This flag should generally not be changed unless the operator is\n"
+      "mitigating known problems with their network setup, such as\n"
+      "clients/proxies that do not close connections to the master.",
+      DEFAULT_MAX_OPERATOR_EVENT_STREAM_SUBSCRIBERS);
+
   add(&Flags::max_completed_frameworks,
       "max_completed_frameworks",
       "Maximum number of completed frameworks to store in memory.",
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index ed2d76a..78623d6 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -90,6 +90,7 @@ public:
   std::string authorizers;
   std::string http_authenticators;
   Option<std::string> http_framework_authenticators;
+  size_t max_operator_event_stream_subscribers;
   size_t max_completed_frameworks;
   size_t max_completed_tasks_per_framework;
   size_t max_unreachable_tasks_per_framework;
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5443bd1..53aa7a1 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -325,7 +325,7 @@ Master::Master(
     detector(_detector),
     authorizer(_authorizer),
     frameworks(flags),
-    subscribers(this),
+    subscribers(this, flags.max_operator_event_stream_subscribers),
     authenticator(None()),
     metrics(new Metrics(*this)),
     electedTime(None())
@@ -12129,7 +12129,9 @@ void Master::Subscribers::Subscriber::send(
 void Master::exited(const id::UUID& id)
 {
   if (!subscribers.subscribed.contains(id)) {
-    LOG(WARNING) << "Unknown subscriber " << id << " disconnected";
+    // NOTE: This is only possible when the master closes an event stream
+    // by deleting the subscriber from the `subscribed` map. There will
+    // be separate logging when that happens.
     return;
   }
 
@@ -12153,7 +12155,16 @@ void Master::subscribe(
              exited(http.streamId);
            }));
 
-  subscribers.subscribed.put(
+  if (subscribers.subscribed.size() >=
+      flags.max_operator_event_stream_subscribers) {
+    LOG(INFO)
+      << "Reached the maximum number of operator event stream subscribers ("
+      << flags.max_operator_event_stream_subscribers << ") so the oldest "
+      << "connection (" << std::get<0>(*subscribers.subscribed.begin())
+      << ") will be closed";
+  }
+
+  subscribers.subscribed.set(
       http.streamId,
       Owned<Subscribers::Subscriber>(
           new Subscribers::Subscriber{http, principal}));
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0c1821c..99549ab 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2052,7 +2052,9 @@ private:
 
   struct Subscribers
   {
-    Subscribers(Master* _master) : master(_master) {};
+    Subscribers(Master* _master, size_t maxSubscribers)
+      : master(_master),
+        subscribed(maxSubscribers) {};
 
     // Represents a client subscribed to the 'api/vX' endpoint.
     //
@@ -2112,7 +2114,7 @@ private:
 
     // Active subscribers to the 'api/vX' endpoint keyed by the stream
     // identifier.
-    hashmap<id::UUID, process::Owned<Subscriber>> subscribed;
+    BoundedHashMap<id::UUID, process::Owned<Subscriber>> subscribed;
   };
 
   Subscribers subscribers;
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index edde48a..80104a7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -36,6 +36,7 @@
 #include <stout/gtest.hpp>
 #include <stout/jsonify.hpp>
 #include <stout/nothing.hpp>
+#include <stout/numify.hpp>
 #include <stout/recordio.hpp>
 #include <stout/stringify.hpp>
 #include <stout/try.hpp>
@@ -3583,6 +3584,172 @@ TEST_P(MasterAPITest, Heartbeat)
 }
 
 
+// Verifies that old subscribers are disconnected when too many
+// active subscribers are attached to the master's event stream at once.
+TEST_P(MasterAPITest, MaxEventStreamSubscribers)
+{
+  Clock::pause();
+
+  ContentType contentType = GetParam();
+
+  // Lower the max number of connections for this test.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.max_operator_event_stream_subscribers = 2;
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Define some objects we'll use for all the SUBSCRIBE calls.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Future<Result<v1::master::Event>> event;
+
+  // Send two connections to fill up the circular buffer.
+  Future<http::Response> response1 = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response1);
+  ASSERT_EQ(http::Response::PIPE, response1->type);
+  ASSERT_SOME(response1->reader);
+  http::Pipe::Reader reader1 = response1->reader.get();
+
+  Reader<v1::master::Event> decoder1(
+      Decoder<v1::master::Event>(deserializer), reader1);
+
+  event = decoder1.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  event = decoder1.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  Future<http::Response> response2 = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response2);
+  ASSERT_EQ(http::Response::PIPE, response2->type);
+  ASSERT_SOME(response2->reader);
+  http::Pipe::Reader reader2 = response2->reader.get();
+
+  Reader<v1::master::Event> decoder2(
+      Decoder<v1::master::Event>(deserializer), reader2);
+
+  event = decoder2.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  event = decoder2.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  // Start a third connection.
+  {
+    // This is basically `http::streaming::post` unwrapped inside the
+    // test body. We must do this in order to control the lifetime of
+    // the `http::Connection`. The HTTP helper will keep the streaming
+    // connection alive until the server closes it, but this test wants
+    // to prematurely close the connection.
+    http::URL url(
+        "http",
+        master.get()->pid.address.ip,
+        master.get()->pid.address.port,
+        strings::join("/", master.get()->pid.id, "api/v1"));
+
+    http::Request request;
+    request.method = "POST";
+    request.url = url;
+    request.keepAlive = false;
+    request.headers = headers;
+    request.body = serialize(contentType, v1Call);
+    request.headers["Content-Type"] = stringify(contentType);
+
+    Future<http::Connection> connection = http::connect(request.url);
+
+    Future<http::Response> response3 = connection
+      .then([request](http::Connection connection) {
+        return connection.send(request, true);
+      });
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response3);
+    ASSERT_EQ(http::Response::PIPE, response3->type);
+    ASSERT_SOME(response3->reader);
+    http::Pipe::Reader reader3 = response3->reader.get();
+
+    Reader<v1::master::Event> decoder3(
+        Decoder<v1::master::Event>(deserializer), reader3);
+
+    event = decoder3.read();
+    AWAIT_READY(event);
+    ASSERT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+    event = decoder3.read();
+    AWAIT_READY(event);
+    ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+    // The first connection should have been kicked out by the third.
+    event = decoder1.read();
+    AWAIT_READY(event);
+    ASSERT_TRUE(event->isNone());
+
+    // The connection will go out of scope and be destructed, which brings
+    // the total active connections below the maximum.
+  }
+
+  // Verify that the second connection is still open.
+  Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+  event = decoder2.read();
+  AWAIT_READY(event);
+  ASSERT_TRUE(event->isSome());
+  ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  // Start a fourth connection. This should be under the maximum
+  // and should not cause any disconnections.
+  Future<http::Response> response4 = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response4);
+  ASSERT_EQ(http::Response::PIPE, response4->type);
+  ASSERT_SOME(response4->reader);
+  http::Pipe::Reader reader4 = response4->reader.get();
+
+  Reader<v1::master::Event> decoder4(
+      Decoder<v1::master::Event>(deserializer), reader4);
+
+  event = decoder4.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  event = decoder4.read();
+  AWAIT_READY(event);
+  ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  // Verify that the second connection is still open.
+  Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+  event = decoder2.read();
+  AWAIT_READY(event);
+  ASSERT_TRUE(event->isSome());
+  ASSERT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  Clock::resume();
+}
+
+
 // This test verifies if we can retrieve the current quota status through
 // `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
 TEST_P(MasterAPITest, GetQuota)