You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/01/18 10:30:30 UTC
[1/2] mesos git commit: Added a helper function for resource provider
tests.
Repository: mesos
Updated Branches:
refs/heads/master a3e1f9b82 -> 8cf6b0882
Added a helper function for resource provider tests.
Review: https://reviews.apache.org/r/65125/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/75659c96
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/75659c96
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/75659c96
Branch: refs/heads/master
Commit: 75659c9613b768cde6ca8afe7fbdaa1306b0cef8
Parents: a3e1f9b
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jan 17 15:22:47 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Jan 18 11:27:37 2018 +0100
----------------------------------------------------------------------
src/tests/api_tests.cpp | 36 ++--------
src/tests/mesos.hpp | 23 +++++++
src/tests/resource_provider_manager_tests.cpp | 80 +++-------------------
src/tests/slave_tests.cpp | 70 +++----------------
4 files changed, 45 insertions(+), 164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/75659c96/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 6faefc9..5d8e32f 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -33,8 +33,6 @@
#include <process/http.hpp>
#include <process/owned.hpp>
-#include <process/ssl/flags.hpp>
-
#include <stout/gtest.hpp>
#include <stout/jsonify.hpp>
#include <stout/nothing.hpp>
@@ -232,21 +230,8 @@ TEST_P(MasterAPITest, GetAgents)
"200", "*", None(), None(), v1::createDiskSourceRaw()));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
updateAgentMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
@@ -6159,21 +6144,8 @@ TEST_P(AgentAPITest, GetResourceProviders)
"200", "*", None(), None(), v1::createDiskSourceRaw()));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- slave.get()->pid.address.ip,
- slave.get()->pid.address.port,
- slave.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
http://git-wip-us.apache.org/repos/asf/mesos/blob/75659c96/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 16c75bb..8a2be90 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -46,6 +46,7 @@
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <process/http.hpp>
#include <process/io.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
@@ -53,6 +54,7 @@
#include <process/queue.hpp>
#include <process/subprocess.hpp>
+#include <process/ssl/flags.hpp>
#include <process/ssl/gtest.hpp>
#include <stout/bytes.hpp>
@@ -3105,6 +3107,27 @@ private:
std::unique_ptr<Driver> driver;
};
+inline process::Owned<EndpointDetector> createEndpointDetector(
+ const process::UPID& pid)
+{
+ // Start and register a resource provider.
+ std::string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ process::http::URL url(
+ scheme,
+ pid.address.ip,
+ pid.address.port,
+ pid.id + "/api/v1/resource_provider");
+
+ return process::Owned<EndpointDetector>(new ConstantEndpointDetector(url));
+}
+
} // namespace resource_provider {
http://git-wip-us.apache.org/repos/asf/mesos/blob/75659c96/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index d80823c..2944b06 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -34,8 +34,6 @@
#include <process/gtest.hpp>
#include <process/http.hpp>
-#include <process/ssl/flags.hpp>
-
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/gtest.hpp>
@@ -80,7 +78,6 @@ using process::Clock;
using process::Future;
using process::Message;
using process::Owned;
-using process::PID;
using process::http::Accepted;
using process::http::BadRequest;
@@ -956,21 +953,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
resourceProviderInfo, Some(v1::Resources(disk)));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
const ContentType contentType = GetParam();
@@ -1095,21 +1079,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
const ContentType contentType = GetParam();
@@ -1155,13 +1126,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
AWAIT_READY(__recover);
- url = http::URL(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- endpointDetector.reset(new ConstantEndpointDetector(url));
+ endpointDetector =
+ resource_provider::createEndpointDetector(agent.get()->pid);
resourceProvider.reset(new v1::MockResourceProvider(
resourceProviderInfo,
@@ -1223,21 +1189,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
v1::Resources(disk)));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
const ContentType contentType = GetParam();
@@ -1314,21 +1267,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
new v1::MockResourceProvider(resourceProviderInfo));
// Start and register a resource provider.
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- http::URL url(
- scheme,
- agent.get()->pid.address.ip,
- agent.get()->pid.address.port,
- agent.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
Future<Event::Subscribed> subscribed1;
EXPECT_CALL(*resourceProvider1, subscribed(_))
http://git-wip-us.apache.org/repos/asf/mesos/blob/75659c96/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 59e3065..df02b46 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -46,8 +46,6 @@
#include <process/reap.hpp>
#include <process/subprocess.hpp>
-#include <process/ssl/flags.hpp>
-
#include <stout/hashset.hpp>
#include <stout/json.hpp>
#include <stout/none.hpp>
@@ -8704,21 +8702,8 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
EXPECT_CALL(resourceProvider, connected())
.WillOnce(FutureSatisfy(&connected));
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- process::http::URL url(
- scheme,
- slave.get()->pid.address.ip,
- slave.get()->pid.address.port,
- slave.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
resourceProvider.start(
endpointDetector,
@@ -8836,21 +8821,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ResourceProviderPublishAll)
v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- process::http::URL url(
- scheme,
- slave.get()->pid.address.ip,
- slave.get()->pid.address.port,
- slave.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
resourceProvider.start(
endpointDetector,
@@ -9151,21 +9123,8 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
resourceProviderInfo,
resourceProviderResources);
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- process::http::URL url(
- scheme,
- slave.get()->pid.address.ip,
- slave.get()->pid.address.port,
- slave.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
@@ -9381,21 +9340,8 @@ TEST_F(SlaveTest, RunTaskResourceVersions)
resourceProviderInfo,
resourceProviderResources);
- string scheme = "http";
-
-#ifdef USE_SSL_SOCKET
- if (process::network::openssl::flags().enabled) {
- scheme = "https";
- }
-#endif
-
- process::http::URL url(
- scheme,
- slave.get()->pid.address.ip,
- slave.get()->pid.address.port,
- slave.get()->pid.id + "/api/v1/resource_provider");
-
- Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
[2/2] mesos git commit: Added a resource provider test case.
Posted by bb...@apache.org.
Added a resource provider test case.
Review: https://reviews.apache.org/r/65126/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cf6b088
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cf6b088
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cf6b088
Branch: refs/heads/master
Commit: 8cf6b0882e8e393702c673e2b29ac6781cec0b85
Parents: 75659c9
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jan 17 15:23:02 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Jan 18 11:28:12 2018 +0100
----------------------------------------------------------------------
src/tests/master_tests.cpp | 128 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 128 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cf6b088/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 0edf224..d01f3fb 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8608,6 +8608,134 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(MasterTest, RegistryGcByCount)
}
+// This test verifies that updating a resource provider's state
+// that isn't motivated by (re-)registration (e.g. when adding
+// resources) is correctly handled by agent and master: Offers are
+// rescinded and new resources are offered.
+TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
+{
+ Clock::pause();
+
+ // Start master and agent.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(agent);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(updateSlaveMessage);
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::Resource disk1 = v1::createDiskResource(
+ "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+ Owned<v1::MockResourceProvider> resourceProvider(
+ new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk1)));
+
+ // Start and register a resource provider with a single disk resources.
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
+
+ resourceProvider->start(
+ endpointDetector, ContentType::PROTOBUF, v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(updateSlaveMessage);
+ ASSERT_TRUE(resourceProvider->info.has_id());
+
+ disk1.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+
+ // Start and register a framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ EXPECT_CALL(*scheduler, subscribed(_, _));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ v1::Resources offeredResources = offers->offers(0).resources();
+ offeredResources.unallocate();
+
+ EXPECT_TRUE(offeredResources.contains(disk1));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // Add another resource to the resource provider. By sending
+ // 'UPDATE_STATE', the resource provider manager will be notified
+ // of the new resource.
+ v1::Resource disk2 = v1::createDiskResource(
+ "100", "*", None(), None(), v1::createDiskSourceBlock());
+ disk2.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+
+ v1::resource_provider::Call call;
+ call.mutable_resource_provider_id()->CopyFrom(resourceProvider->info.id());
+ call.set_type(v1::resource_provider::Call::UPDATE_STATE);
+
+ v1::resource_provider::Call::UpdateState* updateState =
+ call.mutable_update_state();
+ updateState->add_resources()->CopyFrom(disk1);
+ updateState->add_resources()->CopyFrom(disk2);
+ updateState->mutable_resource_version_uuid()->set_value(
+ id::UUID::random().toBytes());
+
+ EXPECT_CALL(*scheduler, rescind(_, _));
+
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ resourceProvider->send(call);
+
+ AWAIT_READY(updateSlaveMessage);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ offeredResources = offers->offers(0).resources();
+ offeredResources.unallocate();
+
+ EXPECT_TRUE(offeredResources.contains(disk1));
+ EXPECT_TRUE(offeredResources.contains(disk2));
+}
+
+
class MasterTestPrePostReservationRefinement
: public MasterTest,
public WithParamInterface<bool> {