You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/27 21:42:31 UTC

mesos git commit: Fixed resource provider driver disconnection handling.

Repository: mesos
Updated Branches:
  refs/heads/master 57f37c8e6 -> 41cdab963


Fixed resource provider driver disconnection handling.

The expectation for disconnection of the resource provider driver is
that disconnection handlers of the resource provider would be invoked
and a new connection would be detected.

This patch fixed the issue by transition the future returned by
ConstantEndpointDetector into DISCARDED if a "discard" is initiated by
the caller. This will properly trigger `detected` callback to be called.

This patch is based on: https://reviews.apache.org/r/64806/

Review: https://reviews.apache.org/r/64856


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/41cdab96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/41cdab96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/41cdab96

Branch: refs/heads/master
Commit: 41cdab963121cead0c0937ac821c8d7cdbeddacf
Parents: 57f37c8
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Dec 22 13:24:53 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 27 13:42:03 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/detector.cpp            | 26 +++++-
 src/resource_provider/http_connection.hpp     |  1 +
 src/tests/resource_provider_manager_tests.cpp | 96 ++++++++++++++++++++++
 3 files changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/resource_provider/detector.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/detector.cpp b/src/resource_provider/detector.cpp
index 59f2b9b..e878d54 100644
--- a/src/resource_provider/detector.cpp
+++ b/src/resource_provider/detector.cpp
@@ -16,9 +16,15 @@
 
 #include "resource_provider/detector.hpp"
 
+#include <memory>
+#include <utility>
+
+#include <stout/lambda.hpp>
+
 namespace http = process::http;
 
 using process::Future;
+using process::Promise;
 
 namespace mesos {
 namespace internal {
@@ -33,7 +39,25 @@ Future<Option<http::URL>> ConstantEndpointDetector::detect(
   if (previous.isNone() || stringify(previous.get()) != stringify(url)) {
     return url;
   } else {
-    return Future<Option<http::URL>>(); // A pending future.
+    // Use a promise here to properly handle discard semantics.
+    std::unique_ptr<Promise<Option<http::URL>>> promise(
+        new Promise<Option<http::URL>>());
+
+    Future<Option<http::URL>> future = promise->future();
+
+    // TODO(jieyu): There is a cyclic dependency here because `future`
+    // holds a reference to `promise` and `promise` holds a reference
+    // to the `future`. It won't get properly cleaned up if
+    // `future.discard()` is not called and `future` is not terminal.
+    // Currently, it's OK because the caller always do a
+    // `future.discard()` before removing the reference to `future`.
+    future.onDiscard(lambda::partial(
+        [](std::unique_ptr<Promise<Option<http::URL>>> promise) {
+          promise->discard();
+        },
+        std::move(promise)));
+
+    return future;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/resource_provider/http_connection.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp
index 3d5088d..add5acc 100644
--- a/src/resource_provider/http_connection.hpp
+++ b/src/resource_provider/http_connection.hpp
@@ -322,6 +322,7 @@ protected:
     subscribed = None();
     endpoint = None();
     connectionId = None();
+    detection.discard();
   }
 
   void disconnected(const id::UUID& _connectionId, const std::string& failure)

http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index f58ab6b..096747e 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -90,7 +90,9 @@ using process::http::UnsupportedMediaType;
 using std::string;
 using std::vector;
 
+using testing::DoAll;
 using testing::Eq;
+using testing::Invoke;
 using testing::SaveArg;
 using testing::Values;
 using testing::WithParamInterface;
@@ -1295,6 +1297,100 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
   }
 }
 
+
+// This test verifies that if a second resource provider subscribes
+// with the ID of an already connected resource provider, the first
+// instance gets disconnected and the second subscription is handled
+// as a resubscription.
+TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
+{
+  Clock::pause();
+
+  // Start master and agent.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(updateSlaveMessage);
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Owned<v1::MockResourceProvider> resourceProvider1(
+      new v1::MockResourceProvider(resourceProviderInfo));
+
+  // Start and register a resource provider.
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  http::URL url(
+      scheme,
+      agent.get()->pid.address.ip,
+      agent.get()->pid.address.port,
+      agent.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  Future<Event::Subscribed> subscribed1;
+  EXPECT_CALL(*resourceProvider1, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed1));
+
+  resourceProvider1->start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(subscribed1);
+
+  resourceProviderInfo.mutable_id()->CopyFrom(subscribed1->provider_id());
+
+  // Subscribing a second resource provider with the same ID will
+  // disconnect the first instance and handle the subscription by the
+  // second resource provider as a resubscription.
+  Owned<v1::MockResourceProvider> resourceProvider2(
+      new v1::MockResourceProvider(resourceProviderInfo));
+
+  // We terminate the first resource provider once we have confirmed
+  // that it got disconnected. This avoids it to in turn resubscribe
+  // racing with the other resource provider.
+  Future<Nothing> disconnected1;
+  EXPECT_CALL(*resourceProvider1, disconnected())
+    .WillOnce(DoAll(
+        FutureSatisfy(&disconnected1),
+        Invoke([&resourceProvider1]() { resourceProvider1.reset(); })))
+    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.
+
+  Future<Event::Subscribed> subscribed2;
+  EXPECT_CALL(*resourceProvider2, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed2));
+
+  resourceProvider2->start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(disconnected1);
+  AWAIT_READY(subscribed2);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {