You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/07/27 21:41:06 UTC
[4/4] mesos git commit: Added test cases for framework streaming
events.
Added test cases for framework streaming events.
Added test cases for 'FRAMEWORK_ADDED', 'FRAMEWORK_UPDATED' and
'FRAMEWORK_REMOVED' events in v1 operator API.
Review: https://reviews.apache.org/r/60931/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a5ad763d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a5ad763d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a5ad763d
Branch: refs/heads/master
Commit: a5ad763d1c3a2a35b897255fe75e19b65ccea269
Parents: bb0e4f2
Author: Quinn Leng <qu...@gmail.com>
Authored: Thu Jul 27 14:32:01 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Thu Jul 27 14:38:44 2017 -0700
----------------------------------------------------------------------
src/tests/api_tests.cpp | 185 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 185 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a5ad763d/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index f22ca28..1d5b080 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -1922,6 +1922,191 @@ TEST_P(MasterAPITest, Subscribe)
}
+// This test tries to verify that a client subscribed to the 'api/v1' endpoint
+// can receive `FRAMEWORK_ADDED`, `FRAMEWORK_UPDATED` and 'FRAMEWORK_REMOVED'
+// events.
+TEST_P(MasterAPITest, FrameworksEvent)
+{
+ ContentType contentType = GetParam();
+
+ Try<Owned<cluster::Master>> master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ headers["Accept"] = stringify(contentType);
+
+ Future<http::Response> response = http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+ const v1::master::Response::GetState& getState =
+ event->get().subscribed().get_state();
+
+ EXPECT_EQ(0u, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(0u, getState.get_agents().agents_size());
+ EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+ EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+ event = decoder.read();
+ EXPECT_TRUE(event.isPending());
+
+ // Start a scheduler. The subscriber will receive a 'FRAMEWORK_ADDED' event
+ // when the scheduler subscribes with the master.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+ auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ contentType,
+ scheduler,
+ detector);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ // Set the timeout to a large value to avoid the framework being removed
+ // when it reconnects.
+ frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+ {
+ v1::scheduler::Call call;
+ call.set_type(v1::scheduler::Call::SUBSCRIBE);
+
+ v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId = subscribed->framework_id();
+ frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(event);
+
+ {
+ EXPECT_EQ(v1::master::Event::FRAMEWORK_ADDED, event.get().get().type());
+
+ const v1::master::Response::GetFrameworks::Framework& framework =
+ event.get().get().framework_added().framework();
+
+ EXPECT_EQ(frameworkInfo, framework.framework_info());
+ EXPECT_TRUE(framework.active());
+ EXPECT_TRUE(framework.connected());
+ }
+
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Force a reconnection with the master. This should result in a
+ // 'FRAMEWORK_UPDATED' event when the scheduler re-registers with the master.
+ mesos.reconnect();
+
+ AWAIT_READY(disconnected);
+
+ // The scheduler should be able to immediately reconnect with the master.
+ AWAIT_READY(connected);
+
+ {
+ v1::scheduler::Call call;
+ call.set_type(v1::scheduler::Call::SUBSCRIBE);
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+
+ v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ mesos.send(call);
+ }
+
+ event = decoder.read();
+ AWAIT_READY(event);
+
+ {
+ EXPECT_EQ(v1::master::Event::FRAMEWORK_UPDATED, event.get().get().type());
+
+ const v1::master::Response::GetFrameworks::Framework& framework =
+ event.get().get().framework_updated().framework();
+
+ EXPECT_EQ(frameworkInfo, framework.framework_info());
+ }
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ // Send a teardown request to the master to teardown the framework.
+ // The subscriber will receive a 'FRAMEWORK_REMOVED' event from the master.
+ {
+ Future<http::Response> response = process::http::post(
+ master.get()->pid,
+ "teardown",
+ createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+ "frameworkId=" + frameworkId.value());
+
+ AWAIT_READY(response);
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ }
+
+ AWAIT_READY(disconnected);
+
+ event = decoder.read();
+ AWAIT_READY(event);
+
+ {
+ EXPECT_EQ(v1::master::Event::FRAMEWORK_REMOVED, event.get().get().type());
+
+ const v1::FrameworkID& frameworkId_ =
+ event.get().get().framework_removed().framework_info().id();
+
+ EXPECT_EQ(frameworkId, frameworkId_);
+ }
+}
+
+
// This test verifies if we can retrieve the current quota status through
// `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, GetQuota)