You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/16 01:03:40 UTC

[5/5] mesos git commit: Added fault tolerance tests for the V1 API.

Added fault tolerance tests for the V1 API.

Similar to already existing tests in `fault_tolerance_tests.cpp`,
this change adds fault tolerance tests for HTTP schedulers/executors.
Some tests have been omitted since they were no longer valid now due
to the persistent streaming connection between the master and the
scheduler in the new API.

Review: https://reviews.apache.org/r/44733/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e85b743
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e85b743
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e85b743

Branch: refs/heads/master
Commit: 3e85b743a169b89bda18af79f197c5e921261cc8
Parents: 8c35ee3
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:15 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:15 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                          |   1 +
 src/tests/http_fault_tolerance_tests.cpp | 958 ++++++++++++++++++++++++++
 2 files changed, 959 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ee5a65..9dd21b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1845,6 +1845,7 @@ mesos_tests_SOURCES =						\
   tests/hierarchical_allocator_tests.cpp			\
   tests/hook_tests.cpp						\
   tests/http_authentication_tests.cpp				\
+  tests/http_fault_tolerance_tests.cpp				\
   tests/log_tests.cpp						\
   tests/logging_tests.cpp					\
   tests/main.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/tests/http_fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_fault_tolerance_tests.cpp b/src/tests/http_fault_tolerance_tests.cpp
new file mode 100644
index 0000000..7c7f3d9
--- /dev/null
+++ b/src/tests/http_fault_tolerance_tests.cpp
@@ -0,0 +1,958 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "master/allocator/mesos/allocator.hpp"
+
+#include "master/master.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::allocator::MesosAllocatorProcess;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// NOTE: These tests are for the v1 HTTP API and are semantically similar to
+// the corresponding tests in `src/tests/fault_tolerance_tests.cpp`.
+class HttpFaultToleranceTest : public MesosTest {};
+
+
+// This test verifies that a framework attempting to subscribe
+// after its failover timeout has elapsed is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterFailoverTimeout)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  v1::FrameworkInfo frameworkInfo = DEFAULT_V1_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
+      _, &master::allocator::MesosAllocatorProcess::deactivateFramework);
+
+  v1::FrameworkID frameworkId;
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  // Launch the first (i.e., failing) scheduler and wait until it receives
+  // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    Future<Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    {
+      Call call;
+      call.set_type(Call::SUBSCRIBE);
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(subscribed);
+
+    frameworkId = subscribed->framework_id();
+  }
+
+  // Wait until master schedules the framework for removal.
+  AWAIT_READY(deactivateFramework);
+
+  // Simulate framework failover timeout.
+  Clock::pause();
+  Clock::settle();
+
+  Try<Duration> failoverTimeout =
+    Duration::create(frameworkInfo.failover_timeout());
+
+  ASSERT_SOME(failoverTimeout);
+
+  Future<Nothing> frameworkFailoverTimeout =
+    FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
+
+  Clock::advance(failoverTimeout.get());
+
+  // Wait until master actually marks the framework as completed.
+  AWAIT_READY(frameworkFailoverTimeout);
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    // Framework should get `Error` event because the framework with this id
+    // is marked as completed.
+    Future<Nothing> error;
+    EXPECT_CALL(*scheduler, error(_, _))
+      .WillOnce(FutureSatisfy(&error));
+
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .Times(AtMost(1));
+
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+      subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(error);
+  }
+
+  Shutdown();
+}
+
+
+// This test verifies that a framework attempting to subscribe after teardown
+// is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterTeardown)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  v1::FrameworkID frameworkId;
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  // Launch the first (i.e., failing) scheduler and wait until it receives
+  // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    Future<Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    {
+      Call call;
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(subscribed);
+
+    frameworkId = subscribed->framework_id();
+
+    Future<Nothing> removeFramework = FUTURE_DISPATCH(
+      _, &master::allocator::MesosAllocatorProcess::removeFramework);
+
+    Future<Nothing> disconnected;
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .WillOnce(FutureSatisfy(&disconnected));
+
+    // Teardown the scheduler now.
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::TEARDOWN);
+
+      schedulerLibrary.send(call);
+    }
+
+    // Wait until master actually marks the framework as completed.
+    AWAIT_READY(removeFramework);
+
+    // Wait for `removeFramework()` to be completed on the master.
+    Clock::pause();
+    Clock::settle();
+
+    // The scheduler should eventually realize the disconnection.
+    AWAIT_READY(disconnected);
+  }
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    // Framework should get `Error` event because the framework
+    // with this id is marked as completed.
+    Future<Nothing> error;
+    EXPECT_CALL(*scheduler, error(_, _))
+      .WillOnce(FutureSatisfy(&error));
+
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .Times(AtMost(1));
+
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+      subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(error);
+  }
+
+  Shutdown();
+}
+
+
+// This test checks that a failed over scheduler gets the retried status update
+// when the original instance dies without acknowledging the update.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverStatusUpdate)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(executor::SendUpdateFromTask(
+        frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+  Future<Nothing> acknowledged;
+  EXPECT_CALL(*executor, acknowledged(_, _))
+    .WillOnce(FutureSatisfy(&acknowledged));
+
+  Future<Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(acknowledged);
+  AWAIT_READY(update);
+
+  EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+  EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+  EXPECT_TRUE(update->status().has_executor_id());
+  EXPECT_TRUE(update->status().has_uuid());
+
+  // Failover the scheduler without acknowledging the status update.
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Scheduler2 should receive the retried status update.
+  Future<Nothing> update2;
+  EXPECT_CALL(*scheduler2, update(_, _))
+    .WillOnce(FutureSatisfy(&update2))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Clock::pause();
+
+  // Now advance time enough for the reliable timeout to kick in and
+  // another status update to be sent.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(update2);
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler receives the executor to
+// framework message.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  v1::executor::Mesos* executorLib;
+  EXPECT_CALL(*executor, subscribed(_, _))
+    .WillOnce(SaveArg<0>(&executorLib));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Future<Event::Message> message;
+  EXPECT_CALL(*scheduler2, message(_, _))
+    .WillOnce(FutureArg<1>(&message));
+
+  {
+    v1::executor::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+    call.set_type(v1::executor::Call::MESSAGE);
+
+    v1::executor::Call::Message* message = call.mutable_message();
+    message->set_data("hello world");
+
+    executorLib->send(call);
+  }
+
+  AWAIT_READY(message);
+  ASSERT_EQ("hello world", message->data());
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler is able to send a message
+// to the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Future<v1::executor::Event::Message> message;
+  EXPECT_CALL(*executor, message(_, _))
+    .WillOnce(FutureArg<1>(&message));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::MESSAGE);
+
+    Call::Message* message = call.mutable_message();
+    message->mutable_agent_id()->CopyFrom(offer.agent_id());
+    message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
+    message->set_data("hello world");
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(message);
+  ASSERT_EQ("hello world", message->data());
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test checks that a scheduler exit shuts down the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerExit)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(*executor, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::TEARDOWN);
+
+    schedulerLibrary.send(call);
+  }
+
+  // Ensure that the executor receives a `Event::Shutdown` after the
+  // scheduler exit.
+  AWAIT_READY(shutdown);
+
+  Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {