You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/16 01:03:40 UTC
[5/5] mesos git commit: Added fault tolerance tests for the V1 API.
Added fault tolerance tests for the V1 API.
Similar to already existing tests in `fault_tolerance_tests.cpp`,
this change adds fault tolerance tests for HTTP schedulers/executors.
Some tests have been omitted since they were no longer valid now due
to the persistent streaming connection between the master and the
scheduler in the new API.
Review: https://reviews.apache.org/r/44733/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e85b743
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e85b743
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e85b743
Branch: refs/heads/master
Commit: 3e85b743a169b89bda18af79f197c5e921261cc8
Parents: 8c35ee3
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:15 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:15 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/http_fault_tolerance_tests.cpp | 958 ++++++++++++++++++++++++++
2 files changed, 959 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ee5a65..9dd21b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1845,6 +1845,7 @@ mesos_tests_SOURCES = \
tests/hierarchical_allocator_tests.cpp \
tests/hook_tests.cpp \
tests/http_authentication_tests.cpp \
+ tests/http_fault_tolerance_tests.cpp \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/tests/http_fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_fault_tolerance_tests.cpp b/src/tests/http_fault_tolerance_tests.cpp
new file mode 100644
index 0000000..7c7f3d9
--- /dev/null
+++ b/src/tests/http_fault_tolerance_tests.cpp
@@ -0,0 +1,958 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "master/allocator/mesos/allocator.hpp"
+
+#include "master/master.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::allocator::MesosAllocatorProcess;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// NOTE: These tests are for the v1 HTTP API and are semantically similar to
+// the corresponding tests in `src/tests/fault_tolerance_tests.cpp`.
+class HttpFaultToleranceTest : public MesosTest {};
+
+
+// This test verifies that a framework attempting to subscribe
+// after its failover timeout has elapsed is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterFailoverTimeout)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ v1::FrameworkInfo frameworkInfo = DEFAULT_V1_FRAMEWORK_INFO;
+ frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
+ _, &master::allocator::MesosAllocatorProcess::deactivateFramework);
+
+ v1::FrameworkID frameworkId;
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ // Launch the first (i.e., failing) scheduler and wait until it receives
+ // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ frameworkId = subscribed->framework_id();
+ }
+
+ // Wait until master schedules the framework for removal.
+ AWAIT_READY(deactivateFramework);
+
+ // Simulate framework failover timeout.
+ Clock::pause();
+ Clock::settle();
+
+ Try<Duration> failoverTimeout =
+ Duration::create(frameworkInfo.failover_timeout());
+
+ ASSERT_SOME(failoverTimeout);
+
+ Future<Nothing> frameworkFailoverTimeout =
+ FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
+
+ Clock::advance(failoverTimeout.get());
+
+ // Wait until master actually marks the framework as completed.
+ AWAIT_READY(frameworkFailoverTimeout);
+
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ // Framework should get `Error` event because the framework with this id
+ // is marked as completed.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(error);
+ }
+
+ Shutdown();
+}
+
+
+// This test verifies that a framework attempting to subscribe after teardown
+// is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterTeardown)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ v1::FrameworkID frameworkId;
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ // Launch the first (i.e., failing) scheduler and wait until it receives
+ // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ frameworkId = subscribed->framework_id();
+
+ Future<Nothing> removeFramework = FUTURE_DISPATCH(
+ _, &master::allocator::MesosAllocatorProcess::removeFramework);
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ // Teardown the scheduler now.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::TEARDOWN);
+
+ schedulerLibrary.send(call);
+ }
+
+ // Wait until master actually marks the framework as completed.
+ AWAIT_READY(removeFramework);
+
+ // Wait for `removeFramework()` to be completed on the master.
+ Clock::pause();
+ Clock::settle();
+
+ // The scheduler should eventually realize the disconnection.
+ AWAIT_READY(disconnected);
+ }
+
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ // Framework should get `Error` event because the framework
+ // with this id is marked as completed.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(error);
+ }
+
+ Shutdown();
+}
+
+
+// This test checks that a failed over scheduler gets the retried status update
+// when the original instance dies without acknowledging the update.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverStatusUpdate)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(executor::SendUpdateFromTask(
+ frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+ Future<Nothing> acknowledged;
+ EXPECT_CALL(*executor, acknowledged(_, _))
+ .WillOnce(FutureSatisfy(&acknowledged));
+
+ Future<Event::Update> update;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(acknowledged);
+ AWAIT_READY(update);
+
+ EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+ EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+ EXPECT_TRUE(update->status().has_executor_id());
+ EXPECT_TRUE(update->status().has_uuid());
+
+ // Failover the scheduler without acknowledging the status update.
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ // Scheduler2 should receive the retried status update.
+ Future<Nothing> update2;
+ EXPECT_CALL(*scheduler2, update(_, _))
+ .WillOnce(FutureSatisfy(&update2))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Clock::pause();
+
+ // Now advance time enough for the reliable timeout to kick in and
+ // another status update to be sent.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+ AWAIT_READY(update2);
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler receives the executor to
+// framework message.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ v1::executor::Mesos* executorLib;
+ EXPECT_CALL(*executor, subscribed(_, _))
+ .WillOnce(SaveArg<0>(&executorLib));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Future<Event::Message> message;
+ EXPECT_CALL(*scheduler2, message(_, _))
+ .WillOnce(FutureArg<1>(&message));
+
+ {
+ v1::executor::Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+ call.set_type(v1::executor::Call::MESSAGE);
+
+ v1::executor::Call::Message* message = call.mutable_message();
+ message->set_data("hello world");
+
+ executorLib->send(call);
+ }
+
+ AWAIT_READY(message);
+ ASSERT_EQ("hello world", message->data());
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler is able to send a message
+// to the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Future<v1::executor::Event::Message> message;
+ EXPECT_CALL(*executor, message(_, _))
+ .WillOnce(FutureArg<1>(&message));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::MESSAGE);
+
+ Call::Message* message = call.mutable_message();
+ message->mutable_agent_id()->CopyFrom(offer.agent_id());
+ message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
+ message->set_data("hello world");
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(message);
+ ASSERT_EQ("hello world", message->data());
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test checks that a scheduler exit shuts down the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerExit)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(*executor, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::TEARDOWN);
+
+ schedulerLibrary.send(call);
+ }
+
+ // Ensure that the executor receives a `Event::Shutdown` after the
+ // scheduler exit.
+ AWAIT_READY(shutdown);
+
+ Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {