You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/16 01:03:36 UTC

[1/5] mesos git commit: Made `ExamplesTest.TestHTTPFramework` use the example http executor.

Repository: mesos
Updated Branches:
  refs/heads/master 37958fd70 -> 3e85b743a


Made `ExamplesTest.TestHTTPFramework` use the example http executor.

Review: https://reviews.apache.org/r/44727/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fd7c4cc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fd7c4cc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fd7c4cc

Branch: refs/heads/master
Commit: 6fd7c4cc0fba37f2fe2b9f86332f8cf61343567b
Parents: 37958fd
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:02:50 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:02:50 2016 -0700

----------------------------------------------------------------------
 src/examples/test_http_framework.cpp | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6fd7c4cc/src/examples/test_http_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_framework.cpp b/src/examples/test_http_framework.cpp
index 036cdcf..64c5097 100644
--- a/src/examples/test_http_framework.cpp
+++ b/src/examples/test_http_framework.cpp
@@ -369,12 +369,12 @@ int main(int argc, char** argv)
   string uri;
   Option<string> value = os::getenv("MESOS_BUILD_DIR");
   if (value.isSome()) {
-    uri = path::join(value.get(), "src", "test-executor");
+    uri = path::join(value.get(), "src", "test-http-executor");
   } else {
     uri = path::join(
         os::realpath(Path(argv[0]).dirname()).get(),
         "src",
-        "test-executor");
+        "test-http-executor");
   }
 
   mesos::internal::logging::Flags flags;


[5/5] mesos git commit: Added fault tolerance tests for the V1 API.

Posted by vi...@apache.org.
Added fault tolerance tests for the V1 API.

Similar to already existing tests in `fault_tolerance_tests.cpp`,
this change adds fault tolerance tests for HTTP schedulers/executors.
Some tests have been omitted since they were no longer valid now due
to the persistent streaming connection between the master and the
scheduler in the new API.

Review: https://reviews.apache.org/r/44733/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3e85b743
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3e85b743
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3e85b743

Branch: refs/heads/master
Commit: 3e85b743a169b89bda18af79f197c5e921261cc8
Parents: 8c35ee3
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:15 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:15 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                          |   1 +
 src/tests/http_fault_tolerance_tests.cpp | 958 ++++++++++++++++++++++++++
 2 files changed, 959 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 7ee5a65..9dd21b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1845,6 +1845,7 @@ mesos_tests_SOURCES =						\
   tests/hierarchical_allocator_tests.cpp			\
   tests/hook_tests.cpp						\
   tests/http_authentication_tests.cpp				\
+  tests/http_fault_tolerance_tests.cpp				\
   tests/log_tests.cpp						\
   tests/logging_tests.cpp					\
   tests/main.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/3e85b743/src/tests/http_fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/http_fault_tolerance_tests.cpp b/src/tests/http_fault_tolerance_tests.cpp
new file mode 100644
index 0000000..7c7f3d9
--- /dev/null
+++ b/src/tests/http_fault_tolerance_tests.cpp
@@ -0,0 +1,958 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <gmock/gmock.h>
+
+#include <mesos/executor.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/try.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "master/allocator/mesos/allocator.hpp"
+
+#include "master/master.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using mesos::internal::master::allocator::MesosAllocatorProcess;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Containerizer;
+using mesos::internal::slave::Slave;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+using mesos::v1::scheduler::Mesos;
+
+using process::Clock;
+using process::Future;
+using process::PID;
+
+using std::string;
+
+using testing::_;
+using testing::AtMost;
+using testing::DoAll;
+using testing::Return;
+using testing::SaveArg;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+// NOTE: These tests are for the v1 HTTP API and are semantically similar to
+// the corresponding tests in `src/tests/fault_tolerance_tests.cpp`.
+class HttpFaultToleranceTest : public MesosTest {};
+
+
+// This test verifies that a framework attempting to subscribe
+// after its failover timeout has elapsed is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterFailoverTimeout)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  v1::FrameworkInfo frameworkInfo = DEFAULT_V1_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  Future<Nothing> deactivateFramework = FUTURE_DISPATCH(
+      _, &master::allocator::MesosAllocatorProcess::deactivateFramework);
+
+  v1::FrameworkID frameworkId;
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  // Launch the first (i.e., failing) scheduler and wait until it receives
+  // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    Future<Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    {
+      Call call;
+      call.set_type(Call::SUBSCRIBE);
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(subscribed);
+
+    frameworkId = subscribed->framework_id();
+  }
+
+  // Wait until master schedules the framework for removal.
+  AWAIT_READY(deactivateFramework);
+
+  // Simulate framework failover timeout.
+  Clock::pause();
+  Clock::settle();
+
+  Try<Duration> failoverTimeout =
+    Duration::create(frameworkInfo.failover_timeout());
+
+  ASSERT_SOME(failoverTimeout);
+
+  Future<Nothing> frameworkFailoverTimeout =
+    FUTURE_DISPATCH(_, &Master::frameworkFailoverTimeout);
+
+  Clock::advance(failoverTimeout.get());
+
+  // Wait until master actually marks the framework as completed.
+  AWAIT_READY(frameworkFailoverTimeout);
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    // Framework should get `Error` event because the framework with this id
+    // is marked as completed.
+    Future<Nothing> error;
+    EXPECT_CALL(*scheduler, error(_, _))
+      .WillOnce(FutureSatisfy(&error));
+
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .Times(AtMost(1));
+
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+      subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(error);
+  }
+
+  Shutdown();
+}
+
+
+// This test verifies that a framework attempting to subscribe after teardown
+// is disallowed.
+TEST_F(HttpFaultToleranceTest, SchedulerSubscribeAfterTeardown)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  v1::FrameworkID frameworkId;
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  // Launch the first (i.e., failing) scheduler and wait until it receives
+  // a `SUBSCRIBED` event to launch the second (i.e., failover) scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    Future<Event::Subscribed> subscribed;
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    EXPECT_CALL(*scheduler, heartbeat(_))
+      .WillRepeatedly(Return()); // Ignore heartbeats.
+
+    {
+      Call call;
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(subscribed);
+
+    frameworkId = subscribed->framework_id();
+
+    Future<Nothing> removeFramework = FUTURE_DISPATCH(
+      _, &master::allocator::MesosAllocatorProcess::removeFramework);
+
+    Future<Nothing> disconnected;
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .WillOnce(FutureSatisfy(&disconnected));
+
+    // Teardown the scheduler now.
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::TEARDOWN);
+
+      schedulerLibrary.send(call);
+    }
+
+    // Wait until master actually marks the framework as completed.
+    AWAIT_READY(removeFramework);
+
+    // Wait for `removeFramework()` to be completed on the master.
+    Clock::pause();
+    Clock::settle();
+
+    // The scheduler should eventually realize the disconnection.
+    AWAIT_READY(disconnected);
+  }
+
+  // Now launch the second (i.e., failover) scheduler using the
+  // framework id recorded from the first scheduler.
+  {
+    auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+    Future<Nothing> connected;
+    EXPECT_CALL(*scheduler, connected(_))
+      .WillOnce(FutureSatisfy(&connected))
+      .WillRepeatedly(Return()); // Ignore future invocations.
+
+    scheduler::TestV1Mesos schedulerLibrary(
+        master.get(), contentType, scheduler);
+
+    AWAIT_READY(connected);
+
+    // Framework should get `Error` event because the framework
+    // with this id is marked as completed.
+    Future<Nothing> error;
+    EXPECT_CALL(*scheduler, error(_, _))
+      .WillOnce(FutureSatisfy(&error));
+
+    EXPECT_CALL(*scheduler, disconnected(_))
+      .Times(AtMost(1));
+
+    {
+      Call call;
+      call.mutable_framework_id()->CopyFrom(frameworkId);
+      call.set_type(Call::SUBSCRIBE);
+
+      Call::Subscribe* subscribe = call.mutable_subscribe();
+      subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+      subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+      schedulerLibrary.send(call);
+    }
+
+    AWAIT_READY(error);
+  }
+
+  Shutdown();
+}
+
+
+// This test checks that a failed over scheduler gets the retried status update
+// when the original instance dies without acknowledging the update.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverStatusUpdate)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(executor::SendUpdateFromTask(
+        frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+  Future<Nothing> acknowledged;
+  EXPECT_CALL(*executor, acknowledged(_, _))
+    .WillOnce(FutureSatisfy(&acknowledged));
+
+  Future<Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(acknowledged);
+  AWAIT_READY(update);
+
+  EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+  EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+  EXPECT_TRUE(update->status().has_executor_id());
+  EXPECT_TRUE(update->status().has_uuid());
+
+  // Failover the scheduler without acknowledging the status update.
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Scheduler2 should receive the retried status update.
+  Future<Nothing> update2;
+  EXPECT_CALL(*scheduler2, update(_, _))
+    .WillOnce(FutureSatisfy(&update2))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Clock::pause();
+
+  // Now advance time enough for the reliable timeout to kick in and
+  // another status update to be sent.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(update2);
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler receives the executor to
+// framework message.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverExecutorToFrameworkMessage)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  v1::executor::Mesos* executorLib;
+  EXPECT_CALL(*executor, subscribed(_, _))
+    .WillOnce(SaveArg<0>(&executorLib));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Future<Event::Message> message;
+  EXPECT_CALL(*scheduler2, message(_, _))
+    .WillOnce(FutureArg<1>(&message));
+
+  {
+    v1::executor::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+    call.set_type(v1::executor::Call::MESSAGE);
+
+    v1::executor::Call::Message* message = call.mutable_message();
+    message->set_data("hello world");
+
+    executorLib->send(call);
+  }
+
+  AWAIT_READY(message);
+  ASSERT_EQ("hello world", message->data());
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test ensures that the failed over scheduler is able to send a message
+// to the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerFailoverFrameworkToExecutorMessage)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", executorId));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  auto scheduler2 = std::make_shared<MockV1HTTPScheduler>();
+
+  EXPECT_CALL(*scheduler2, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  // Failover to another scheduler instance.
+  scheduler::TestV1Mesos schedulerLibrary2(
+      master.get(), contentType, scheduler2);
+
+  AWAIT_READY(connected);
+
+  // The previously connected scheduler instance should receive an
+  // error/disconnected event.
+  Future<Nothing> error;
+  EXPECT_CALL(*scheduler, error(_, _))
+    .WillOnce(FutureSatisfy(&error));
+
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .WillOnce(FutureSatisfy(&disconnected));
+
+  EXPECT_CALL(*scheduler2, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler2, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+    subscribe->mutable_framework_info()->mutable_id()->CopyFrom(frameworkId);
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(error);
+  AWAIT_READY(disconnected);
+  AWAIT_READY(subscribed);
+
+  EXPECT_EQ(frameworkId, subscribed.get().framework_id());
+
+  Future<v1::executor::Event::Message> message;
+  EXPECT_CALL(*executor, message(_, _))
+    .WillOnce(FutureArg<1>(&message));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::MESSAGE);
+
+    Call::Message* message = call.mutable_message();
+    message->mutable_agent_id()->CopyFrom(offer.agent_id());
+    message->mutable_executor_id()->CopyFrom(DEFAULT_V1_EXECUTOR_ID);
+    message->set_data("hello world");
+
+    schedulerLibrary2.send(call);
+  }
+
+  AWAIT_READY(message);
+  ASSERT_EQ("hello world", message->data());
+
+  EXPECT_CALL(*executor, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*executor, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(*scheduler2, disconnected(_))
+    .Times(AtMost(1));
+
+  Shutdown();
+}
+
+
+// This test checks that a scheduler exit shuts down the executor.
+TEST_F(HttpFaultToleranceTest, SchedulerExit)
+{
+  master::Flags flags = CreateMasterFlags();
+  flags.authenticate_frameworks = false;
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+  auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+  ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+  TestContainerizer containerizer(executorId, executor);
+
+  Try<PID<Slave>> slave = StartSlave(&containerizer);
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore future invocations.
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  scheduler::TestV1Mesos schedulerLibrary(master.get(), contentType, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed.get().framework_id());
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  EXPECT_CALL(*executor, connected(_))
+    .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+  EXPECT_CALL(*executor, subscribed(_, _));
+
+  Future<Nothing> launch;
+  EXPECT_CALL(*executor, launch(_, _))
+    .WillOnce(FutureSatisfy(&launch));
+
+  const v1::Offer& offer = offers->offers(0);
+
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    schedulerLibrary.send(call);
+  }
+
+  AWAIT_READY(launch);
+
+  EXPECT_CALL(*scheduler, disconnected(_))
+    .Times(AtMost(1));
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(*executor, shutdown(_))
+    .WillOnce(FutureSatisfy(&shutdown));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::TEARDOWN);
+
+    schedulerLibrary.send(call);
+  }
+
+  // Ensure that the executor receives a `Event::Shutdown` after the
+  // scheduler exit.
+  AWAIT_READY(shutdown);
+
+  Shutdown();
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {


[2/5] mesos git commit: Replaced `.get()` `Option` calls with `->` operator.

Posted by vi...@apache.org.
Replaced `.get()` `Option` calls with `->` operator.

This change replaces `.get()` calls with `->` in the scheduler library.

Review: https://reviews.apache.org/r/44728/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b8623345
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b8623345
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b8623345

Branch: refs/heads/master
Commit: b86233452b49955b4e0a5692bc720b3c22e8a063
Parents: 6fd7c4c
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:00 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:00 2016 -0700

----------------------------------------------------------------------
 src/scheduler/scheduler.cpp | 30 +++++++++++++++---------------
 1 file changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b8623345/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 35f4794..6a83447 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -209,7 +209,7 @@ public:
     Option<Error> error = validation::scheduler::call::validate(devolve(call));
 
     if (error.isSome()) {
-      drop(call, error.get().message);
+      drop(call, error->message);
       return;
     }
 
@@ -253,7 +253,7 @@ public:
       CHECK_SOME(streamId);
 
       // Set the stream ID associated with this connection.
-      request.headers["Mesos-Stream-Id"] = streamId.get().toString();
+      request.headers["Mesos-Stream-Id"] = streamId->toString();
 
       response = connections->nonSubscribe.send(request);
     }
@@ -419,7 +419,7 @@ protected:
       VLOG(1) << "Re-detecting master";
       master = None();
       latest = None();
-    } else if (future.get().isNone()) {
+    } else if (future->isNone()) {
       VLOG(1) << "Lost leading master";
       master = None();
       latest = None();
@@ -496,12 +496,12 @@ protected:
     if (response->code == process::http::Status::OK) {
       // Only SUBSCRIBE call should get a "200 OK" response.
       CHECK_EQ(Call::SUBSCRIBE, call.type());
-      CHECK_EQ(response.get().type, http::Response::PIPE);
-      CHECK_SOME(response.get().reader);
+      CHECK_EQ(response->type, http::Response::PIPE);
+      CHECK_SOME(response->reader);
 
       state = SUBSCRIBED;
 
-      Pipe::Reader reader = response.get().reader.get();
+      Pipe::Reader reader = response->reader.get();
 
       auto deserializer =
         lambda::bind(deserialize<Event>, contentType, lambda::_1);
@@ -512,9 +512,9 @@ protected:
       subscribed = SubscribedResponse {reader, decoder};
 
       // Responses to SUBSCRIBE calls should always include a stream ID.
-      CHECK(response.get().headers.contains("Mesos-Stream-Id"));
+      CHECK(response->headers.contains("Mesos-Stream-Id"));
 
-      streamId = UUID::fromString(response.get().headers.at("Mesos-Stream-Id"));
+      streamId = UUID::fromString(response->headers.at("Mesos-Stream-Id"));
 
       read();
 
@@ -537,15 +537,15 @@ protected:
     if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
       // This could happen if the master hasn't realized it is the leader yet
       // or is still in the process of recovery.
-      LOG(WARNING) << "Received '" << response.get().status << "' ("
-                   << response.get().body << ") for " << call.type();
+      LOG(WARNING) << "Received '" << response->status << "' ("
+                   << response->body << ") for " << call.type();
       return;
     }
 
     // We should be able to get here only for AuthN errors which is not
     // yet supported for HTTP frameworks.
-    error("Received unexpected '" + response.get().status + "' (" +
-          response.get().body + ") for " + stringify(call.type()));
+    error("Received unexpected '" + response->status + "' (" +
+          response->body + ") for " + stringify(call.type()));
   }
 
   void read()
@@ -579,7 +579,7 @@ protected:
     }
 
     // This could happen if the master failed over after sending an event.
-    if (!event.get().isSome()) {
+    if (!event->isSome()) {
       const string error = "End-Of-File received from master. The master "
                            "closed the event stream";
       LOG(ERROR) << error;
@@ -588,8 +588,8 @@ protected:
       return;
     }
 
-    if (event.get().isError()) {
-      error("Failed to de-serialize event: " + event.get().error());
+    if (event->isError()) {
+      error("Failed to de-serialize event: " + event->error());
     } else {
       receive(event.get().get(), false);
     }


[4/5] mesos git commit: Modified `FaultToleranceTest.SchedulerExit` to wait for shutdown.

Posted by vi...@apache.org.
Modified `FaultToleranceTest.SchedulerExit` to wait for shutdown.

This change modified the existing test to wait for the shutdown
message instead of making a best-effort `AtMost` check for
the message.

Review: https://reviews.apache.org/r/44731/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c35ee31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c35ee31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c35ee31

Branch: refs/heads/master
Commit: 8c35ee31a9320acd829b35a6f7c7b189becd385c
Parents: 92b3541
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:11 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:11 2016 -0700

----------------------------------------------------------------------
 src/tests/fault_tolerance_tests.cpp | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8c35ee31/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 349669d..67f3627 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1580,12 +1580,17 @@ TEST_F(FaultToleranceTest, SchedulerExit)
   AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
+  Future<Nothing> shutdown;
   EXPECT_CALL(exec, shutdown(_))
-    .Times(AtMost(1));
+    .WillOnce(FutureSatisfy(&shutdown));
 
   driver.stop();
   driver.join();
 
+  // Ensure that the executor receives a shutdown message after the
+  // scheduler exit.
+  AWAIT_READY(shutdown);
+
   Shutdown();
 }
 


[3/5] mesos git commit: Close the connection upon framework teardown for HTTP frameworks.

Posted by vi...@apache.org.
Close the connection upon framework teardown for HTTP frameworks.

This change fixes a `TODO` and closes the connection upon receieving a
teardown request from the HTTP framework.

Review: https://reviews.apache.org/r/44729/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/92b35417
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/92b35417
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/92b35417

Branch: refs/heads/master
Commit: 92b35417b8b44f1eeaa19638c5d4bc6b9c52a0bd
Parents: b862334
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Mar 15 17:03:05 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Mar 15 17:03:05 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/92b35417/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index d0380db..e6290ea 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6166,7 +6166,10 @@ void Master::removeFramework(Framework* framework)
 
   // TODO(benh): unlink(framework->pid);
 
-  // TODO(anand): For http frameworks, close the connection.
+  // For http frameworks, close the connection.
+  if (framework->http.isSome()) {
+    framework->http->close();
+  }
 
   framework->unregisteredTime = Clock::now();