You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/27 21:42:31 UTC
mesos git commit: Fixed resource provider driver disconnection
handling.
Repository: mesos
Updated Branches:
refs/heads/master 57f37c8e6 -> 41cdab963
Fixed resource provider driver disconnection handling.
The expectation for disconnection of the resource provider driver is
that disconnection handlers of the resource provider would be invoked
and a new connection would be detected.
This patch fixed the issue by transition the future returned by
ConstantEndpointDetector into DISCARDED if a "discard" is initiated by
the caller. This will properly trigger `detected` callback to be called.
This patch is based on: https://reviews.apache.org/r/64806/
Review: https://reviews.apache.org/r/64856
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/41cdab96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/41cdab96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/41cdab96
Branch: refs/heads/master
Commit: 41cdab963121cead0c0937ac821c8d7cdbeddacf
Parents: 57f37c8
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Dec 22 13:24:53 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 27 13:42:03 2017 -0800
----------------------------------------------------------------------
src/resource_provider/detector.cpp | 26 +++++-
src/resource_provider/http_connection.hpp | 1 +
src/tests/resource_provider_manager_tests.cpp | 96 ++++++++++++++++++++++
3 files changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/resource_provider/detector.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/detector.cpp b/src/resource_provider/detector.cpp
index 59f2b9b..e878d54 100644
--- a/src/resource_provider/detector.cpp
+++ b/src/resource_provider/detector.cpp
@@ -16,9 +16,15 @@
#include "resource_provider/detector.hpp"
+#include <memory>
+#include <utility>
+
+#include <stout/lambda.hpp>
+
namespace http = process::http;
using process::Future;
+using process::Promise;
namespace mesos {
namespace internal {
@@ -33,7 +39,25 @@ Future<Option<http::URL>> ConstantEndpointDetector::detect(
if (previous.isNone() || stringify(previous.get()) != stringify(url)) {
return url;
} else {
- return Future<Option<http::URL>>(); // A pending future.
+ // Use a promise here to properly handle discard semantics.
+ std::unique_ptr<Promise<Option<http::URL>>> promise(
+ new Promise<Option<http::URL>>());
+
+ Future<Option<http::URL>> future = promise->future();
+
+ // TODO(jieyu): There is a cyclic dependency here because `future`
+ // holds a reference to `promise` and `promise` holds a reference
+ // to the `future`. It won't get properly cleaned up if
+ // `future.discard()` is not called and `future` is not terminal.
+ // Currently, it's OK because the caller always do a
+ // `future.discard()` before removing the reference to `future`.
+ future.onDiscard(lambda::partial(
+ [](std::unique_ptr<Promise<Option<http::URL>>> promise) {
+ promise->discard();
+ },
+ std::move(promise)));
+
+ return future;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/resource_provider/http_connection.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp
index 3d5088d..add5acc 100644
--- a/src/resource_provider/http_connection.hpp
+++ b/src/resource_provider/http_connection.hpp
@@ -322,6 +322,7 @@ protected:
subscribed = None();
endpoint = None();
connectionId = None();
+ detection.discard();
}
void disconnected(const id::UUID& _connectionId, const std::string& failure)
http://git-wip-us.apache.org/repos/asf/mesos/blob/41cdab96/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index f58ab6b..096747e 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -90,7 +90,9 @@ using process::http::UnsupportedMediaType;
using std::string;
using std::vector;
+using testing::DoAll;
using testing::Eq;
+using testing::Invoke;
using testing::SaveArg;
using testing::Values;
using testing::WithParamInterface;
@@ -1295,6 +1297,100 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
}
}
+
+// This test verifies that if a second resource provider subscribes
+// with the ID of an already connected resource provider, the first
+// instance gets disconnected and the second subscription is handled
+// as a resubscription.
+TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
+{
+ Clock::pause();
+
+ // Start master and agent.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(agent);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(updateSlaveMessage);
+
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ Owned<v1::MockResourceProvider> resourceProvider1(
+ new v1::MockResourceProvider(resourceProviderInfo));
+
+ // Start and register a resource provider.
+ string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ http::URL url(
+ scheme,
+ agent.get()->pid.address.ip,
+ agent.get()->pid.address.port,
+ agent.get()->pid.id + "/api/v1/resource_provider");
+
+ Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+ Future<Event::Subscribed> subscribed1;
+ EXPECT_CALL(*resourceProvider1, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed1));
+
+ resourceProvider1->start(
+ endpointDetector,
+ ContentType::PROTOBUF,
+ v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(subscribed1);
+
+ resourceProviderInfo.mutable_id()->CopyFrom(subscribed1->provider_id());
+
+ // Subscribing a second resource provider with the same ID will
+ // disconnect the first instance and handle the subscription by the
+ // second resource provider as a resubscription.
+ Owned<v1::MockResourceProvider> resourceProvider2(
+ new v1::MockResourceProvider(resourceProviderInfo));
+
+ // We terminate the first resource provider once we have confirmed
+ // that it got disconnected. This avoids it to in turn resubscribe
+ // racing with the other resource provider.
+ Future<Nothing> disconnected1;
+ EXPECT_CALL(*resourceProvider1, disconnected())
+ .WillOnce(DoAll(
+ FutureSatisfy(&disconnected1),
+ Invoke([&resourceProvider1]() { resourceProvider1.reset(); })))
+ .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.
+
+ Future<Event::Subscribed> subscribed2;
+ EXPECT_CALL(*resourceProvider2, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed2));
+
+ resourceProvider2->start(
+ endpointDetector,
+ ContentType::PROTOBUF,
+ v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(disconnected1);
+ AWAIT_READY(subscribed2);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {