You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/09/13 17:55:36 UTC

[2/4] mesos git commit: Cleaned up DefaultExecutor tests.

Cleaned up DefaultExecutor tests.

Updated the DefaultExecutor tests to use test helpers where possible.
Also made the boilerplate initialization code consistent across tests.

Review: https://reviews.apache.org/r/61982/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e7df335a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e7df335a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e7df335a

Branch: refs/heads/master
Commit: e7df335a484131450ff15bcd2ee325ea40dc8155
Parents: 1f4d7ef
Author: Gastón Kleiman <ga...@mesosphere.io>
Authored: Wed Sep 13 09:21:23 2017 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Sep 13 10:50:58 2017 -0700

----------------------------------------------------------------------
 src/tests/default_executor_tests.cpp | 816 +++++++++---------------------
 1 file changed, 239 insertions(+), 577 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e7df335a/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp
index 219891c..0815fb8 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -136,16 +136,8 @@ TEST_P(DefaultExecutorTest, TaskRunning)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -159,14 +151,10 @@ TEST_P(DefaultExecutorTest, TaskRunning)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -174,11 +162,12 @@ TEST_P(DefaultExecutorTest, TaskRunning)
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(resources);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -186,35 +175,19 @@ TEST_P(DefaultExecutorTest, TaskRunning)
   const v1::Offer& offer = offers->offers(0);
   const v1::AgentID& agentId = offer.agent_id();
 
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
-
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo);
-
   Future<v1::scheduler::Event::Update> update;
   EXPECT_CALL(*scheduler, update(_, _))
     .WillOnce(FutureArg<1>(&update));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
 
   AWAIT_READY(update);
 
@@ -270,16 +243,8 @@ TEST_P(DefaultExecutorTest, KillTask)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -292,14 +257,10 @@ TEST_P(DefaultExecutorTest, KillTask)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -307,11 +268,12 @@ TEST_P(DefaultExecutorTest, KillTask)
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(resources);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers1);
   EXPECT_FALSE(offers1->offers().empty());
@@ -325,17 +287,19 @@ TEST_P(DefaultExecutorTest, KillTask)
   v1::TaskInfo taskInfo2 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  v1::TaskGroupInfo taskGroup1;
-  taskGroup1.add_tasks()->CopyFrom(taskInfo1);
-  taskGroup1.add_tasks()->CopyFrom(taskInfo2);
-
   const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()};
 
   Future<v1::scheduler::Event::Update> runningUpdate1;
   Future<v1::scheduler::Event::Update> runningUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&runningUpdate1))
-    .WillOnce(FutureArg<1>(&runningUpdate2));
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   Future<v1::scheduler::Event::Offers> offers2;
   EXPECT_CALL(*scheduler, offers(_, _))
@@ -343,25 +307,14 @@ TEST_P(DefaultExecutorTest, KillTask)
     .WillRepeatedly(Return());
 
   {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer1.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+    v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
+        executorInfo,
+        v1::createTaskGroupInfo({taskInfo1, taskInfo2}));
+    Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup});
 
     // Set a 0s filter to immediately get another offer to launch
     // the second task group.
-    accept->mutable_filters()->set_refuse_seconds(0);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup1);
+    call.mutable_accept()->mutable_filters()->set_refuse_seconds(0);
 
     mesos.send(call);
   }
@@ -386,88 +339,25 @@ TEST_P(DefaultExecutorTest, KillTask)
   v1::TaskInfo taskInfo3 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  v1::TaskGroupInfo taskGroup2;
-  taskGroup2.add_tasks()->CopyFrom(taskInfo3);
-
   Future<v1::scheduler::Event::Update> runningUpdate3;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&runningUpdate3));
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, offer2.agent_id())));
 
   // Launch the second task group.
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer2.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup2);
-
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer2,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo3}))}));
 
   AWAIT_READY(runningUpdate3);
   ASSERT_EQ(TASK_RUNNING, runningUpdate3->status().state());
   ASSERT_EQ(taskInfo3.task_id(), runningUpdate3->status().task_id());
 
-  // Acknowledge the TASK_RUNNING updates to receive the next updates.
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate1->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
-    acknowledge->set_uuid(runningUpdate1->status().uuid());
-
-    mesos.send(call);
-  }
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate2->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
-    acknowledge->set_uuid(runningUpdate2->status().uuid());
-
-    mesos.send(call);
-  }
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate3->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id());
-    acknowledge->set_uuid(runningUpdate3->status().uuid());
-
-    mesos.send(call);
-  }
-
   Future<v1::scheduler::Event::Update> killedUpdate1;
   Future<v1::scheduler::Event::Update> killedUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
@@ -556,16 +446,8 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -579,14 +461,10 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -594,11 +472,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
   v1::Resources resources =
       v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(resources);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -614,35 +493,18 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure)
 
   const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
 
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo1);
-  taskGroup.add_tasks()->CopyFrom(taskInfo2);
-
   Future<v1::scheduler::Event::Update> runningUpdate1;
   Future<v1::scheduler::Event::Update> runningUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
     .WillOnce(FutureArg<1>(&runningUpdate1))
     .WillOnce(FutureArg<1>(&runningUpdate2));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
   AWAIT_READY(runningUpdate1);
   ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
@@ -735,16 +597,8 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -758,14 +612,10 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -773,11 +623,14 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
   executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(resources);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -785,37 +638,21 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor)
   const v1::Offer& offer = offers->offers(0);
   const v1::AgentID& agentId = offer.agent_id();
 
-  v1::TaskInfo taskInfo =
-    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
-
-  taskInfo.mutable_executor()->CopyFrom(executorInfo);
-
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo);
-
   Future<v1::scheduler::Event::Update> update;
   EXPECT_CALL(*scheduler, update(_, _))
     .WillOnce(FutureArg<1>(&update));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+  taskInfo.mutable_executor()->CopyFrom(executorInfo);
 
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
 
   AWAIT_READY(update);
 
@@ -837,23 +674,19 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
   flags.containerizers = GetParam();
 
   Owned<MasterDetector> detector = master.get()->createDetector();
-
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
   ASSERT_SOME(slave);
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(
-        v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO),
-        FutureSatisfy(&connected)));
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
-  Future<Event::Subscribed> subscribed;
+  Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
     .WillOnce(FutureArg<1>(&subscribed));
 
-  Future<Event::Offers> offers;
+  Future<v1::scheduler::Event::Offers> offers;
   EXPECT_CALL(*scheduler, offers(_, _))
     .WillOnce(FutureArg<1>(&offers))
     .WillRepeatedly(Return());
@@ -866,53 +699,47 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask)
       ContentType::PROTOBUF,
       scheduler);
 
-  AWAIT_READY(connected);
-
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
+  v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      "test_default_executor",
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
-      "cpus:0.1;mem:32;disk:32",
-      v1::ExecutorInfo::DEFAULT);
-
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
 
   const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
 
-  v1::TaskInfo task1 = v1::createTask(
-      offer.agent_id(),
-      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
-      v1::createCommandInfo(SLEEP_COMMAND(1000)));
-
-  v1::TaskInfo task2 = v1::createTask(
-      offer.agent_id(),
-      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
-      v1::createCommandInfo(SLEEP_COMMAND(1000)));
+  v1::TaskInfo task1 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
-      executorInfo,
-      v1::createTaskGroupInfo({task1, task2}));
+  v1::TaskInfo task2 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
   Future<Event::Update> updateRunning1;
   Future<Event::Update> updateRunning2;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(DoAll(
-        FutureArg<1>(&updateRunning1),
-        v1::scheduler::SendAcknowledge(
-            frameworkId,
-            offer.agent_id())))
-    .WillOnce(DoAll(
-        FutureArg<1>(&updateRunning2),
-        v1::scheduler::SendAcknowledge(
-            frameworkId,
-            offer.agent_id())));
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&updateRunning1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&updateRunning2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
-  mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup}));
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({task1, task2}))}));
 
   AWAIT_READY(updateRunning1);
   AWAIT_READY(updateRunning2);
@@ -952,16 +779,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -975,14 +794,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -990,11 +805,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_resources()->CopyFrom(resources);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -1003,62 +819,31 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnTaskFailure)
   const v1::AgentID& agentId = offer.agent_id();
 
   // The task exits with a non-zero status code.
-  v1::TaskInfo taskInfo1 = v1::createTask(agentId, resources, "exit 1");
-
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo1);
+  v1::TaskInfo taskInfo = v1::createTask(agentId, resources, "exit 1");
 
   Future<v1::scheduler::Event::Update> runningUpdate;
   Future<v1::scheduler::Event::Update> failedUpdate;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&runningUpdate))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
     .WillOnce(FutureArg<1>(&failedUpdate));
 
   Future<v1::scheduler::Event::Failure> executorFailure;
   EXPECT_CALL(*scheduler, failure(_, _))
     .WillOnce(FutureArg<1>(&executorFailure));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo}))}));
 
   AWAIT_READY(runningUpdate);
   ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
 
-  // Acknowledge the TASK_RUNNING update to receive the next update.
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
-    acknowledge->set_uuid(runningUpdate->status().uuid());
-
-    mesos.send(call);
-  }
-
   AWAIT_READY(failedUpdate);
   ASSERT_EQ(TASK_FAILED, failedUpdate->status().state());
 
@@ -1088,16 +873,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1111,14 +888,10 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
@@ -1126,11 +899,12 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(resources);
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      resources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -1146,41 +920,30 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   v1::TaskInfo taskInfo2 =
     v1::createTask(agentId, resources, SLEEP_COMMAND(1000));
 
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo1);
-  taskGroup.add_tasks()->CopyFrom(taskInfo2);
-
   const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()};
 
   Future<v1::scheduler::Event::Update> runningUpdate1;
   Future<v1::scheduler::Event::Update> runningUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&runningUpdate1))
-    .WillOnce(FutureArg<1>(&runningUpdate2));
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
 
   Future<v1::scheduler::Event::Failure> executorFailure;
   EXPECT_CALL(*scheduler, failure(_, _))
     .WillOnce(FutureArg<1>(&executorFailure));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
-    mesos.send(call);
-  }
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo1, taskInfo2}))}));
 
   AWAIT_READY(runningUpdate1);
   ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
@@ -1200,40 +963,6 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask)
   EXPECT_CALL(*scheduler, update(_, _))
     .WillOnce(FutureArg<1>(&finishedUpdate));
 
-  // Acknowledge the TASK_RUNNING updates to receive the next updates.
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate1->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
-    acknowledge->set_uuid(runningUpdate1->status().uuid());
-
-    mesos.send(call);
-  }
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
-
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-
-    acknowledge->mutable_task_id()->CopyFrom(
-        runningUpdate2->status().task_id());
-
-    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
-    acknowledge->set_uuid(runningUpdate2->status().uuid());
-
-    mesos.send(call);
-  }
-
   AWAIT_READY(finishedUpdate);
   ASSERT_EQ(TASK_FINISHED, finishedUpdate->status().state());
   ASSERT_EQ(taskInfo1.task_id(), finishedUpdate->status().task_id());
@@ -1286,19 +1015,11 @@ TEST_P(DefaultExecutorTest, ReservedResources)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
-  EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(FutureSatisfy(&connected));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
-
   v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.set_role("role");
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1306,23 +1027,26 @@ TEST_P(DefaultExecutorTest, ReservedResources)
 
   Future<v1::scheduler::Event::Offers> offers;
   EXPECT_CALL(*scheduler, offers(_, _))
-    .WillOnce(FutureArg<1>(&offers));
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
 
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  {
-    Call call;
-    call.set_type(Call::SUBSCRIBE);
-    Call::Subscribe* subscribe = call.mutable_subscribe();
-    subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
-
-    mesos.send(call);
-  }
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
 
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
   v1::Resources unreserved =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
@@ -1331,50 +1055,26 @@ TEST_P(DefaultExecutorTest, ReservedResources)
     unreserved.pushReservation(v1::createDynamicReservationInfo(
         frameworkInfo.role(), frameworkInfo.principal()));
 
-  v1::ExecutorInfo executorInfo;
-  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
-  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(reserved);
-
-  AWAIT_READY(offers);
-  EXPECT_FALSE(offers->offers().empty());
-
-  const v1::Offer& offer = offers->offers(0);
-  const v1::AgentID& agentId = offer.agent_id();
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      reserved,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   // Launch the task using unreserved resources.
   v1::TaskInfo taskInfo =
     v1::createTask(agentId, unreserved, SLEEP_COMMAND(1000));
 
-  v1::TaskGroupInfo taskGroup;
-  taskGroup.add_tasks()->CopyFrom(taskInfo);
+  v1::Offer::Operation reserve = v1::RESERVE(reserved);
+  v1::Offer::Operation launchGroup =
+    v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}));
 
   Future<v1::scheduler::Event::Update> runningUpdate;
   EXPECT_CALL(*scheduler, update(_, _))
     .WillOnce(FutureArg<1>(&runningUpdate));
 
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACCEPT);
-
-    Call::Accept* accept = call.mutable_accept();
-    accept->add_offer_ids()->CopyFrom(offer.id());
-
-    accept->add_operations()->CopyFrom(v1::RESERVE(reserved));
-
-    v1::Offer::Operation* operation = accept->add_operations();
-    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
-
-    v1::Offer::Operation::LaunchGroup* launchGroup =
-      operation->mutable_launch_group();
-
-    launchGroup->mutable_executor()->CopyFrom(executorInfo);
-    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
-
-    mesos.send(call);
-  }
+  mesos.send(v1::createCallAccept(frameworkId, offer, {reserve, launchGroup}));
 
   AWAIT_READY(runningUpdate);
   ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state());
@@ -1415,17 +1115,8 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO),
-                    FutureSatisfy(&connected)));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1438,17 +1129,20 @@ TEST_P(DefaultExecutorTest, SigkillExecutor)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
   AWAIT_READY(subscribed);
 
   v1::FrameworkID frameworkId(subscribed->framework_id());
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      "test_default_executor",
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
       "cpus:0.1;mem:32;disk:32",
-      v1::ExecutorInfo::DEFAULT);
-
-  // Update `executorInfo` with the subscribed `frameworkId`.
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->offers().empty());
@@ -1523,19 +1217,8 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
 
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
-
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
-                    FutureSatisfy(&connected)));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
 
   Future<v1::scheduler::Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1548,17 +1231,20 @@ TEST_P(DefaultExecutorTest, ROOT_MultiTaskgroupSharePidNamespace)
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
   AWAIT_READY(subscribed);
 
   v1::FrameworkID frameworkId(subscribed->framework_id());
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      "test_default_executor",
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
       "cpus:0.1;mem:32;disk:32",
-      v1::ExecutorInfo::DEFAULT);
-
-  // Update `executorInfo` with the subscribed `frameworkId`.
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers1);
   EXPECT_FALSE(offers1->offers().empty());
@@ -1745,17 +1431,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
   frameworkInfo.set_role(DEFAULT_TEST_ROLE);
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
-                    FutureSatisfy(&connected)));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
 
   Future<Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1769,6 +1446,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
@@ -1791,13 +1473,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get();
 
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      v1::DEFAULT_EXECUTOR_ID.value(),
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
-      None(),
-      v1::ExecutorInfo::DEFAULT);
-
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(executorResources);
+      executorResources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -1888,17 +1568,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
   frameworkInfo.set_role(DEFAULT_TEST_ROLE);
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
-                    FutureSatisfy(&connected)));
-
-  v1::scheduler::TestMesos mesos(
-      master.get()->pid,
-      ContentType::PROTOBUF,
-      scheduler);
-
-  AWAIT_READY(connected);
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
 
   Future<Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -1912,6 +1583,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   EXPECT_CALL(*scheduler, heartbeat(_))
     .WillRepeatedly(Return()); // Ignore heartbeats.
 
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
@@ -1919,13 +1595,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      v1::DEFAULT_EXECUTOR_ID.value(),
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
-      None(),
-      v1::ExecutorInfo::DEFAULT);
-
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(unreserved);
+      unreserved,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   EXPECT_FALSE(offers->offers().empty());
@@ -2012,10 +1686,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
   frameworkInfo.set_role(DEFAULT_TEST_ROLE);
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
-                    FutureSatisfy(&connected)));
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
 
   Future<Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -2034,8 +1706,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
       ContentType::PROTOBUF,
       scheduler);
 
-  AWAIT_READY(connected);
-
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
@@ -2064,13 +1734,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     individualResources.apply(v1::CREATE(executorVolume)).get();
 
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      v1::DEFAULT_EXECUTOR_ID.value(),
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
-      None(),
-      v1::ExecutorInfo::DEFAULT);
-
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(executorResources);
+      executorResources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->offers().empty());
@@ -2227,10 +1895,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
   frameworkInfo.set_role(DEFAULT_TEST_ROLE);
 
-  Future<Nothing> connected;
   EXPECT_CALL(*scheduler, connected(_))
-    .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo),
-                    FutureSatisfy(&connected)));
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
 
   Future<Event::Subscribed> subscribed;
   EXPECT_CALL(*scheduler, subscribed(_, _))
@@ -2249,8 +1915,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
       ContentType::PROTOBUF,
       scheduler);
 
-  AWAIT_READY(connected);
-
   AWAIT_READY(subscribed);
   v1::FrameworkID frameworkId(subscribed->framework_id());
 
@@ -2279,13 +1943,11 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
     individualResources.apply(v1::CREATE(executorVolume)).get();
 
   v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
-      v1::DEFAULT_EXECUTOR_ID.value(),
+      v1::DEFAULT_EXECUTOR_ID,
       None(),
-      None(),
-      v1::ExecutorInfo::DEFAULT);
-
-  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
-  executorInfo.mutable_resources()->CopyFrom(executorResources);
+      executorResources,
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
 
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->offers().empty());