You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/16 01:03:36 UTC
[1/5] mesos git commit: Made `ExamplesTest.TestHTTPFramework` use the
example http executor.
Repository: mesos
Updated Branches:
refs/heads/master 37958fd70 -> 3e85b743a
Made `ExamplesTest.TestHTTPFramework` use the example http executor.
Review: https://reviews.apache.org/r/44727/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fd7c4cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fd7c4cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fd7c4cc
Branch: refs/heads/master
Commit: 6fd7c4cc0fba37f2fe2b9f86332f8cf61343567b
Parents: 37958fd
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:02:50 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:02:50 2016 -0700
----------------------------------------------------------------------
src/examples/test_http_framework.cpp | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6fd7c4cc/src/examples/test_http_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_framework.cpp b/src/examples/test_http_framework.cpp
index 036cdcf..64c5097 100644
--- a/src/examples/test_http_framework.cpp
+++ b/src/examples/test_http_framework.cpp
@@ -369,12 +369,12 @@ int main(int argc, char** argv)
string uri;
Option<string> value = os::getenv("MESOS_BUILD_DIR");
if (value.isSome()) {
- uri = path::join(value.get(), "src", "test-executor");
+ uri = path::join(value.get(), "src", "test-http-executor");
} else {
uri = path::join(
os::realpath(Path(argv[0]).dirname()).get(),
"src",
- "test-executor");
+ "test-http-executor");
}
mesos::internal::logging::Flags flags;
[5/5] mesos git commit: Added fault tolerance tests for the V1 API.
Posted by vi...@apache.org.
Added fault tolerance tests for the V1 API.
Similar to already existing tests in `fault_tolerance_tests.cpp`,
this change adds fault tolerance tests for HTTP schedulers/executors.
Some tests have been omitted since they were no longer valid now due
to the persistent streaming connection between the master and the
scheduler in the new API.
Review: https://reviews.apache.org/r/44733/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e85b743
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e85b743
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e85b743
Branch: refs/heads/master
Commit: 3e85b743a169b89bda18af79f197c5e921261cc8
Parents: 8c35ee3
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:15 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:15 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/http_fault_tolerance_tests.cpp | 958 ++++++++++++++++++++++++++
2 files changed, 959 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ee5a65..9dd21b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1845,6 +1845,7 @@ mesos_tests_SOURCES = \
tests/hierarchical_allocator_tests.cpp \
tests/hook_tests.cpp \
tests/http_authentication_tests.cpp \
+ tests/http_fault_tolerance_tests.cpp \
tests/log_tests.cpp \
tests/logging_tests.cpp \
tests/main.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/tests/http_fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_fault_tolerance_tests.cpp b/src/tests/http_fault_tolerance_tests.cpp
new file mode 100644
index 0000000..7c7f3d9
--- /dev/null
+++ b/src/tests/http_fault_tolerance_tests.cpp
@@ -0,0 +1,958 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "master/allocator/mesos/allocator.hpp"
+
+#include "master/master.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::allocator::MesosAllocatorProcess;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// NOTE: These tests are for the v1 HTTP API and are semantically similar to
+// the corresponding tests in `src/tests/fault_tolerance_tests.cpp`.
+class HttpFaultToleranceTest : public MesosTest {};
+
+
+// This test verifies that a framework attempting to subscribe
+// after its failover timeout has elapsed is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterFailoverTimeout)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ v1::FrameworkInfo frameworkInfo = DEFAULT_V1_FRAMEWORK_INFO;
+ frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
+ _, &master::allocator::MesosAllocatorProcess::deactivateFramework);
+
+ v1::FrameworkID frameworkId;
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ // Launch the first (i.e., failing) scheduler and wait until it receives
+ // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ frameworkId = subscribed->framework_id();
+ }
+
+ // Wait until master schedules the framework for removal.
+ AWAIT_READY(deactivateFramework);
+
+ // Simulate framework failover timeout.
+ Clock::pause();
+ Clock::settle();
+
+ Try<Duration> failoverTimeout =
+ Duration::create(frameworkInfo.failover_timeout());
+
+ ASSERT_SOME(failoverTimeout);
+
+ Future<Nothing> frameworkFailoverTimeout =
+ FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
+
+ Clock::advance(failoverTimeout.get());
+
+ // Wait until master actually marks the framework as completed.
+ AWAIT_READY(frameworkFailoverTimeout);
+
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ // Framework should get `Error` event because the framework with this id
+ // is marked as completed.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(error);
+ }
+
+ Shutdown();
+}
+
+
+// This test verifies that a framework attempting to subscribe after teardown
+// is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterTeardown)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ v1::FrameworkID frameworkId;
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ // Launch the first (i.e., failing) scheduler and wait until it receives
+ // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ frameworkId = subscribed->framework_id();
+
+ Future<Nothing> removeFramework = FUTURE_DISPATCH(
+ _, &master::allocator::MesosAllocatorProcess::removeFramework);
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ // Teardown the scheduler now.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::TEARDOWN);
+
+ schedulerLibrary.send(call);
+ }
+
+ // Wait until master actually marks the framework as completed.
+ AWAIT_READY(removeFramework);
+
+ // Wait for `removeFramework()` to be completed on the master.
+ Clock::pause();
+ Clock::settle();
+
+ // The scheduler should eventually realize the disconnection.
+ AWAIT_READY(disconnected);
+ }
+
+ // Now launch the second (i.e., failover) scheduler using the
+ // framework id recorded from the first scheduler.
+ {
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ scheduler::TestV1Mesos schedulerLibrary(
+ master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ // Framework should get `Error` event because the framework
+ // with this id is marked as completed.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(error);
+ }
+
+ Shutdown();
+}
+
+
+// This test checks that a failed over scheduler gets the retried status update
+// when the original instance dies without acknowledging the update.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverStatusUpdate)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(executor::SendUpdateFromTask(
+ frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+ Future<Nothing> acknowledged;
+ EXPECT_CALL(*executor, acknowledged(_, _))
+ .WillOnce(FutureSatisfy(&acknowledged));
+
+ Future<Event::Update> update;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(acknowledged);
+ AWAIT_READY(update);
+
+ EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+ EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+ EXPECT_TRUE(update->status().has_executor_id());
+ EXPECT_TRUE(update->status().has_uuid());
+
+ // Failover the scheduler without acknowledging the status update.
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ // Scheduler2 should receive the retried status update.
+ Future<Nothing> update2;
+ EXPECT_CALL(*scheduler2, update(_, _))
+ .WillOnce(FutureSatisfy(&update2))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Clock::pause();
+
+ // Now advance time enough for the reliable timeout to kick in and
+ // another status update to be sent.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+ AWAIT_READY(update2);
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler receives the executor to
+// framework message.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ v1::executor::Mesos* executorLib;
+ EXPECT_CALL(*executor, subscribed(_, _))
+ .WillOnce(SaveArg<0>(&executorLib));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Future<Event::Message> message;
+ EXPECT_CALL(*scheduler2, message(_, _))
+ .WillOnce(FutureArg<1>(&message));
+
+ {
+ v1::executor::Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+ call.set_type(v1::executor::Call::MESSAGE);
+
+ v1::executor::Call::Message* message = call.mutable_message();
+ message->set_data("hello world");
+
+ executorLib->send(call);
+ }
+
+ AWAIT_READY(message);
+ ASSERT_EQ("hello world", message->data());
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler is able to send a message
+// to the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+ EXPECT_CALL(*scheduler2, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ // Failover to another scheduler instance.
+ scheduler::TestV1Mesos schedulerLibrary2(
+ master.get(), contentType, scheduler2);
+
+ AWAIT_READY(connected);
+
+ // The previously connected scheduler instance should receive an
+ // error/disconnected event.
+ Future<Nothing> error;
+ EXPECT_CALL(*scheduler, error(_, _))
+ .WillOnce(FutureSatisfy(&error));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ EXPECT_CALL(*scheduler2, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler2, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+ subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(error);
+ AWAIT_READY(disconnected);
+ AWAIT_READY(subscribed);
+
+ EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+ Future<v1::executor::Event::Message> message;
+ EXPECT_CALL(*executor, message(_, _))
+ .WillOnce(FutureArg<1>(&message));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::MESSAGE);
+
+ Call::Message* message = call.mutable_message();
+ message->mutable_agent_id()->CopyFrom(offer.agent_id());
+ message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
+ message->set_data("hello world");
+
+ schedulerLibrary2.send(call);
+ }
+
+ AWAIT_READY(message);
+ ASSERT_EQ("hello world", message->data());
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*scheduler2, disconnected(_))
+ .Times(AtMost(1));
+
+ Shutdown();
+}
+
+
+// This test checks that a scheduler exit shuts down the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerExit)
+{
+ master::Flags flags = CreateMasterFlags();
+ flags.authenticate_frameworks = false;
+
+ Try<PID<Master>> master = StartMaster(flags);
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ Try<PID<Slave>> slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore future invocations.
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _));
+
+ Future<Nothing> launch;
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(FutureSatisfy(&launch));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offer.id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ schedulerLibrary.send(call);
+ }
+
+ AWAIT_READY(launch);
+
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .Times(AtMost(1));
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(*executor, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::TEARDOWN);
+
+ schedulerLibrary.send(call);
+ }
+
+ // Ensure that the executor receives a `Event::Shutdown` after the
+ // scheduler exit.
+ AWAIT_READY(shutdown);
+
+ Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {
[2/5] mesos git commit: Replaced `.get()` `Option` calls with `->`
operator.
Posted by vi...@apache.org.
Replaced `.get()` `Option` calls with `->` operator.
This change replaces `.get()` calls with `->` in the scheduler library.
Review: https://reviews.apache.org/r/44728/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8623345
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8623345
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8623345
Branch: refs/heads/master
Commit: b86233452b49955b4e0a5692bc720b3c22e8a063
Parents: 6fd7c4c
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:00 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:00 2016 -0700
----------------------------------------------------------------------
src/scheduler/scheduler.cpp | 30 +++++++++++++++---------------
1 file changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b8623345/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 35f4794..6a83447 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -209,7 +209,7 @@ public:
Option<Error> error = validation::scheduler::call::validate(devolve(call));
if (error.isSome()) {
- drop(call, error.get().message);
+ drop(call, error->message);
return;
}
@@ -253,7 +253,7 @@ public:
CHECK_SOME(streamId);
// Set the stream ID associated with this connection.
- request.headers["Mesos-Stream-Id"] = streamId.get().toString();
+ request.headers["Mesos-Stream-Id"] = streamId->toString();
response = connections->nonSubscribe.send(request);
}
@@ -419,7 +419,7 @@ protected:
VLOG(1) << "Re-detecting master";
master = None();
latest = None();
- } else if (future.get().isNone()) {
+ } else if (future->isNone()) {
VLOG(1) << "Lost leading master";
master = None();
latest = None();
@@ -496,12 +496,12 @@ protected:
if (response->code == process::http::Status::OK) {
// Only SUBSCRIBE call should get a "200 OK" response.
CHECK_EQ(Call::SUBSCRIBE, call.type());
- CHECK_EQ(response.get().type, http::Response::PIPE);
- CHECK_SOME(response.get().reader);
+ CHECK_EQ(response->type, http::Response::PIPE);
+ CHECK_SOME(response->reader);
state = SUBSCRIBED;
- Pipe::Reader reader = response.get().reader.get();
+ Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<Event>, contentType, lambda::_1);
@@ -512,9 +512,9 @@ protected:
subscribed = SubscribedResponse {reader, decoder};
// Responses to SUBSCRIBE calls should always include a stream ID.
- CHECK(response.get().headers.contains("Mesos-Stream-Id"));
+ CHECK(response->headers.contains("Mesos-Stream-Id"));
- streamId = UUID::fromString(response.get().headers.at("Mesos-Stream-Id"));
+ streamId = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
read();
@@ -537,15 +537,15 @@ protected:
if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
// This could happen if the master hasn't realized it is the leader yet
// or is still in the process of recovery.
- LOG(WARNING) << "Received '" << response.get().status << "' ("
- << response.get().body << ") for " << call.type();
+ LOG(WARNING) << "Received '" << response->status << "' ("
+ << response->body << ") for " << call.type();
return;
}
// We should be able to get here only for AuthN errors which is not
// yet supported for HTTP frameworks.
- error("Received unexpected '" + response.get().status + "' (" +
- response.get().body + ") for " + stringify(call.type()));
+ error("Received unexpected '" + response->status + "' (" +
+ response->body + ") for " + stringify(call.type()));
}
void read()
@@ -579,7 +579,7 @@ protected:
}
// This could happen if the master failed over after sending an event.
- if (!event.get().isSome()) {
+ if (!event->isSome()) {
const string error = "End-Of-File received from master. The master "
"closed the event stream";
LOG(ERROR) << error;
@@ -588,8 +588,8 @@ protected:
return;
}
- if (event.get().isError()) {
- error("Failed to de-serialize event: " + event.get().error());
+ if (event->isError()) {
+ error("Failed to de-serialize event: " + event->error());
} else {
receive(event.get().get(), false);
}
[4/5] mesos git commit: Modified `FaultToleranceTest.SchedulerExit`
to wait for shutdown.
Posted by vi...@apache.org.
Modified `FaultToleranceTest.SchedulerExit` to wait for shutdown.
This change modified the existing test to wait for the shutdown
message instead of making a best-effort `AtMost` check for
the message.
Review: https://reviews.apache.org/r/44731/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c35ee31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c35ee31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c35ee31
Branch: refs/heads/master
Commit: 8c35ee31a9320acd829b35a6f7c7b189becd385c
Parents: 92b3541
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:11 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:11 2016 -0700
----------------------------------------------------------------------
src/tests/fault_tolerance_tests.cpp | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8c35ee31/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 349669d..67f3627 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1580,12 +1580,17 @@ TEST_F(FaultToleranceTest, SchedulerExit)
AWAIT_READY(status);
EXPECT_EQ(TASK_RUNNING, status.get().state());
+ Future<Nothing> shutdown;
EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
+ .WillOnce(FutureSatisfy(&shutdown));
driver.stop();
driver.join();
+ // Ensure that the executor receives a shutdown message after the
+ // scheduler exit.
+ AWAIT_READY(shutdown);
+
Shutdown();
}
[3/5] mesos git commit: Close the connection upon framework teardown
for HTTP frameworks.
Posted by vi...@apache.org.
Close the connection upon framework teardown for HTTP frameworks.
This change fixes a `TODO` and closes the connection upon receieving a
teardown request from the HTTP framework.
Review: https://reviews.apache.org/r/44729/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/92b35417
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/92b35417
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/92b35417
Branch: refs/heads/master
Commit: 92b35417b8b44f1eeaa19638c5d4bc6b9c52a0bd
Parents: b862334
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:05 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:05 2016 -0700
----------------------------------------------------------------------
src/master/master.cpp | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/92b35417/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d0380db..e6290ea 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6166,7 +6166,10 @@ void Master::removeFramework(Framework* framework)
// TODO(benh): unlink(framework->pid);
- // TODO(anand): For http frameworks, close the connection.
+ // For http frameworks, close the connection.
+ if (framework->http.isSome()) {
+ framework->http->close();
+ }
framework->unregisteredTime = Clock::now();