You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/09/13 17:55:36 UTC
[2/4] mesos git commit: Cleaned up DefaultExecutor tests.
Cleaned up DefaultExecutor tests.
Updated the DefaultExecutor tests to use test helpers where possible.
Also made the boilerplate initialization code consistent across tests.
Review: https://reviews.apache.org/r/61982/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e7df335a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e7df335a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e7df335a
Branch: refs/heads/master
Commit: e7df335a484131450ff15bcd2ee325ea40dc8155
Parents: 1f4d7ef
Author: Gastón Kleiman <ga...@mesosphere.io>
Authored: Wed Sep 13 09:21:23 2017 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Sep 13 10:50:58 2017 -0700
----------------------------------------------------------------------
src/tests/default_executor_tests.cpp | 816 +++++++++---------------------
1 file changed, 239 insertions(+), 577 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e7df335a/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 219891c..0815fb8 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -136,16 +136,8 @@ TEST_P(DefaultExecutorTest, TaskRunning)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -159,14 +151,10 @@ TEST_P(DefaultExecutorTest, TaskRunning)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -174,11 +162,12 @@ TEST_P(DefaultExecutorTest, TaskRunning)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(resources);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -186,35 +175,19 @@ TEST_P(DefaultExecutorTest, TaskRunning)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
- v1::TaskInfo taskInfo =
- v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
-
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo);
-
Future<v1::scheduler::Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
AWAIT_READY(update);
@@ -270,16 +243,8 @@ TEST_P(DefaultExecutorTest, KillTask)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -292,14 +257,10 @@ TEST_P(DefaultExecutorTest, KillTask)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -307,11 +268,12 @@ TEST_P(DefaultExecutorTest, KillTask)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(resources);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers1);
EXPECT_FALSE(offers1->offers().empty());
@@ -325,17 +287,19 @@ TEST_P(DefaultExecutorTest, KillTask)
v1::TaskInfo taskInfo2 =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- v1::TaskGroupInfo taskGroup1;
- taskGroup1.add_tasks()->CopyFrom(taskInfo1);
- taskGroup1.add_tasks()->CopyFrom(taskInfo2);
-
const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()};
Future<v1::scheduler::Event::Update> runningUpdate1;
Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&runningUpdate1))
- .WillOnce(FutureArg<1>(&runningUpdate2));
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
Future<v1::scheduler::Event::Offers> offers2;
EXPECT_CALL(*scheduler, offers(_, _))
@@ -343,25 +307,14 @@ TEST_P(DefaultExecutorTest, KillTask)
.WillRepeatedly(Return());
{
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer1.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+ v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
+ executorInfo,
+ v1::createTaskGroupInfo({taskInfo1, taskInfo2}));
+ Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup});
// Set a 0s filter to immediately get another offer to launch
// the second task group.
- accept->mutable_filters()->set_refuse_seconds(0);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup1);
+ call.mutable_accept()->mutable_filters()->set_refuse_seconds(0);
mesos.send(call);
}
@@ -386,88 +339,25 @@ TEST_P(DefaultExecutorTest, KillTask)
v1::TaskInfo taskInfo3 =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- v1::TaskGroupInfo taskGroup2;
- taskGroup2.add_tasks()->CopyFrom(taskInfo3);
-
Future<v1::scheduler::Event::Update> runningUpdate3;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&runningUpdate3));
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate3),
+ v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())));
// Launch the second task group.
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer2.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup2);
-
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer2,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo3}))}));
AWAIT_READY(runningUpdate3);
ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state());
ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id());
- // Acknowledge the TASK_RUNNING updates to receive the next updates.
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate1->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
- acknowledge->set_uuid(runningUpdate1->status().uuid());
-
- mesos.send(call);
- }
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate2->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
- acknowledge->set_uuid(runningUpdate2->status().uuid());
-
- mesos.send(call);
- }
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate3->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id());
- acknowledge->set_uuid(runningUpdate3->status().uuid());
-
- mesos.send(call);
- }
-
Future<v1::scheduler::Event::Update> killedUpdate1;
Future<v1::scheduler::Event::Update> killedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
@@ -556,16 +446,8 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -579,14 +461,10 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -594,11 +472,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(resources);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -614,35 +493,18 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo1);
- taskGroup.add_tasks()->CopyFrom(taskInfo2);
-
Future<v1::scheduler::Event::Update> runningUpdate1;
Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
AWAIT_READY(runningUpdate1);
ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
@@ -735,16 +597,8 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -758,14 +612,10 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -773,11 +623,14 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
+
executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(resources);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -785,37 +638,21 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
const v1::Offer& offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
- v1::TaskInfo taskInfo =
- v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
-
- taskInfo.mutable_executor()->CopyFrom(executorInfo);
-
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo);
-
Future<v1::scheduler::Event::Update> update;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&update));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+ taskInfo.mutable_executor()->CopyFrom(executorInfo);
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
AWAIT_READY(update);
@@ -837,23 +674,19 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
flags.containerizers = GetParam();
Owned<MasterDetector> detector = master.get()->createDetector();
-
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(
- v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO),
- FutureSatisfy(&connected)));
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
- Future<Event::Subscribed> subscribed;
+ Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
- Future<Event::Offers> offers;
+ Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers))
.WillRepeatedly(Return());
@@ -866,53 +699,47 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
ContentType::PROTOBUF,
scheduler);
- AWAIT_READY(connected);
-
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
+ v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- "test_default_executor",
+ v1::DEFAULT_EXECUTOR_ID,
None(),
- "cpus:0.1;mem:32;disk:32",
- v1::ExecutorInfo::DEFAULT);
-
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
- v1::TaskInfo task1 = v1::createTask(
- offer.agent_id(),
- v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
- v1::createCommandInfo(SLEEP_COMMAND(1000)));
-
- v1::TaskInfo task2 = v1::createTask(
- offer.agent_id(),
- v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
- v1::createCommandInfo(SLEEP_COMMAND(1000)));
+ v1::TaskInfo task1 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
- executorInfo,
- v1::createTaskGroupInfo({task1, task2}));
+ v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
Future<Event::Update> updateRunning1;
Future<Event::Update> updateRunning2;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(DoAll(
- FutureArg<1>(&updateRunning1),
- v1::scheduler::SendAcknowledge(
- frameworkId,
- offer.agent_id())))
- .WillOnce(DoAll(
- FutureArg<1>(&updateRunning2),
- v1::scheduler::SendAcknowledge(
- frameworkId,
- offer.agent_id())));
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateRunning1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateRunning2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
- mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({task1, task2}))}));
AWAIT_READY(updateRunning1);
AWAIT_READY(updateRunning2);
@@ -952,16 +779,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -975,14 +794,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -990,11 +805,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_resources()->CopyFrom(resources);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -1003,62 +819,31 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
const v1::AgentID& agentId = offer.agent_id();
// The task exits with a non-zero status code.
- v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 1");
-
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo1);
+ v1::TaskInfo taskInfo = v1::createTask(agentId, resources, "exit 1");
Future<v1::scheduler::Event::Update> runningUpdate;
Future<v1::scheduler::Event::Update> failedUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&runningUpdate))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
.WillOnce(FutureArg<1>(&failedUpdate));
Future<v1::scheduler::Event::Failure> executorFailure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureArg<1>(&executorFailure));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
AWAIT_READY(runningUpdate);
ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
- // Acknowledge the TASK_RUNNING update to receive the next update.
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(runningUpdate->status().uuid());
-
- mesos.send(call);
- }
-
AWAIT_READY(failedUpdate);
ASSERT_EQ(TASK_FAILED, failedUpdate->status().state());
@@ -1088,16 +873,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1111,14 +888,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -1126,11 +899,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(resources);
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ resources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -1146,41 +920,30 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
v1::TaskInfo taskInfo2 =
v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo1);
- taskGroup.add_tasks()->CopyFrom(taskInfo2);
-
const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
Future<v1::scheduler::Event::Update> runningUpdate1;
Future<v1::scheduler::Event::Update> runningUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&runningUpdate1))
- .WillOnce(FutureArg<1>(&runningUpdate2));
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate1),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&runningUpdate2),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)));
Future<v1::scheduler::Event::Failure> executorFailure;
EXPECT_CALL(*scheduler, failure(_, _))
.WillOnce(FutureArg<1>(&executorFailure));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
- mesos.send(call);
- }
+ mesos.send(
+ v1::createCallAccept(
+ frameworkId,
+ offer,
+ {v1::LAUNCH_GROUP(
+ executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
AWAIT_READY(runningUpdate1);
ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
@@ -1200,40 +963,6 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&finishedUpdate));
- // Acknowledge the TASK_RUNNING updates to receive the next updates.
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate1->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(runningUpdate1->status().uuid());
-
- mesos.send(call);
- }
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
-
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
- acknowledge->mutable_task_id()->CopyFrom(
- runningUpdate2->status().task_id());
-
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(runningUpdate2->status().uuid());
-
- mesos.send(call);
- }
-
AWAIT_READY(finishedUpdate);
ASSERT_EQ(TASK_FINISHED, finishedUpdate->status().state());
ASSERT_EQ(taskInfo1.task_id(), finishedUpdate->status().task_id());
@@ -1286,19 +1015,11 @@ TEST_P(DefaultExecutorTest, ReservedResources)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
- EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(FutureSatisfy(&connected));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
-
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.set_role("role");
+ frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1306,23 +1027,26 @@ TEST_P(DefaultExecutorTest, ReservedResources)
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
- .WillOnce(FutureArg<1>(&offers));
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
- {
- Call call;
- call.set_type(Call::SUBSCRIBE);
- Call::Subscribe* subscribe = call.mutable_subscribe();
- subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
-
- mesos.send(call);
- }
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID& agentId = offer.agent_id();
+
v1::Resources unreserved =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
@@ -1331,50 +1055,26 @@ TEST_P(DefaultExecutorTest, ReservedResources)
unreserved.pushReservation(v1::createDynamicReservationInfo(
frameworkInfo.role(), frameworkInfo.principal()));
- v1::ExecutorInfo executorInfo;
- executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
- executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(reserved);
-
- AWAIT_READY(offers);
- EXPECT_FALSE(offers->offers().empty());
-
- const v1::Offer& offer = offers->offers(0);
- const v1::AgentID& agentId = offer.agent_id();
+ v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+ v1::DEFAULT_EXECUTOR_ID,
+ None(),
+ reserved,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
// Launch the task using unreserved resources.
v1::TaskInfo taskInfo =
v1::createTask(agentId, unreserved, SLEEP_COMMAND(1000));
- v1::TaskGroupInfo taskGroup;
- taskGroup.add_tasks()->CopyFrom(taskInfo);
+ v1::Offer::Operation reserve = v1::RESERVE(reserved);
+ v1::Offer::Operation launchGroup =
+ v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}));
Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
.WillOnce(FutureArg<1>(&runningUpdate));
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACCEPT);
-
- Call::Accept* accept = call.mutable_accept();
- accept->add_offer_ids()->CopyFrom(offer.id());
-
- accept->add_operations()->CopyFrom(v1::RESERVE(reserved));
-
- v1::Offer::Operation* operation = accept->add_operations();
- operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
- v1::Offer::Operation::LaunchGroup* launchGroup =
- operation->mutable_launch_group();
-
- launchGroup->mutable_executor()->CopyFrom(executorInfo);
- launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
- mesos.send(call);
- }
+ mesos.send(v1::createCallAccept(frameworkId, offer, {reserve, launchGroup}));
AWAIT_READY(runningUpdate);
ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
@@ -1415,17 +1115,8 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO),
- FutureSatisfy(&connected)));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1438,17 +1129,20 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- "test_default_executor",
+ v1::DEFAULT_EXECUTOR_ID,
None(),
"cpus:0.1;mem:32;disk:32",
- v1::ExecutorInfo::DEFAULT);
-
- // Update `executorInfo` with the subscribed `frameworkId`.
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
@@ -1523,19 +1217,8 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
- v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
- FutureSatisfy(&connected)));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1548,17 +1231,20 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- "test_default_executor",
+ v1::DEFAULT_EXECUTOR_ID,
None(),
"cpus:0.1;mem:32;disk:32",
- v1::ExecutorInfo::DEFAULT);
-
- // Update `executorInfo` with the subscribed `frameworkId`.
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers1);
EXPECT_FALSE(offers1->offers().empty());
@@ -1745,17 +1431,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role(DEFAULT_TEST_ROLE);
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
- FutureSatisfy(&connected)));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1769,6 +1446,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -1791,13 +1473,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- v1::DEFAULT_EXECUTOR_ID.value(),
+ v1::DEFAULT_EXECUTOR_ID,
None(),
- None(),
- v1::ExecutorInfo::DEFAULT);
-
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(executorResources);
+ executorResources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -1888,17 +1568,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role(DEFAULT_TEST_ROLE);
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
- FutureSatisfy(&connected)));
-
- v1::scheduler::TestMesos mesos(
- master.get()->pid,
- ContentType::PROTOBUF,
- scheduler);
-
- AWAIT_READY(connected);
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1912,6 +1583,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -1919,13 +1595,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- v1::DEFAULT_EXECUTOR_ID.value(),
+ v1::DEFAULT_EXECUTOR_ID,
None(),
- None(),
- v1::ExecutorInfo::DEFAULT);
-
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(unreserved);
+ unreserved,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
EXPECT_FALSE(offers->offers().empty());
@@ -2012,10 +1686,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role(DEFAULT_TEST_ROLE);
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
- FutureSatisfy(&connected)));
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -2034,8 +1706,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
ContentType::PROTOBUF,
scheduler);
- AWAIT_READY(connected);
-
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -2064,13 +1734,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
individualResources.apply(v1::CREATE(executorVolume)).get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- v1::DEFAULT_EXECUTOR_ID.value(),
+ v1::DEFAULT_EXECUTOR_ID,
None(),
- None(),
- v1::ExecutorInfo::DEFAULT);
-
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(executorResources);
+ executorResources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
@@ -2227,10 +1895,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role(DEFAULT_TEST_ROLE);
- Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
- .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
- FutureSatisfy(&connected)));
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
Future<Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -2249,8 +1915,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
ContentType::PROTOBUF,
scheduler);
- AWAIT_READY(connected);
-
AWAIT_READY(subscribed);
v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -2279,13 +1943,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
individualResources.apply(v1::CREATE(executorVolume)).get();
v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
- v1::DEFAULT_EXECUTOR_ID.value(),
+ v1::DEFAULT_EXECUTOR_ID,
None(),
- None(),
- v1::ExecutorInfo::DEFAULT);
-
- executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
- executorInfo.mutable_resources()->CopyFrom(executorResources);
+ executorResources,
+ v1::ExecutorInfo::DEFAULT,
+ frameworkId);
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());