You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/05/26 23:57:19 UTC

[1/3] mesos git commit: Added a test for the optional executor reconnect retry in the agent.

Repository: mesos
Updated Branches:
  refs/heads/master 5ba175f5e -> e45dd39c7


Added a test for the optional executor reconnect retry in the agent.

This tests that the retries occur if the agent does not receive
a re-registration from the executor within the timeout interval.

Review: https://reviews.apache.org/r/59585


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/94780733
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/94780733
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/94780733

Branch: refs/heads/master
Commit: 94780733b3004e8b3005b220d09d0d23e1327a78
Parents: 5ba175f
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu May 25 14:35:42 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri May 26 16:56:51 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 110 ++++++++++++++++++++++++++++++++
 1 file changed, 110 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/94780733/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index aedc60a..df0c5c8 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -803,6 +803,116 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
 }
 
 
+// This ensures that when the executor reconnect retry is enabled,
+// the agent will retry the reconnect messages until the executor
+// responds. We then ensure that any duplicate re-registration
+// messages coming from the executor are ignored.
+TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
+{
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = this->CreateSlaveFlags();
+
+  Fetcher fetcher(flags);
+
+  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  Future<TaskStatus> statusUpdate;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+
+  // Pause the clock to ensure the agent does not retry the
+  // status update. We will ensure the acknowledgement is
+  // checkpointed before we terminate the agent.
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(statusUpdate);
+  EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+
+  // Ensure the acknowledgement is checkpointed.
+  Clock::settle();
+
+  slave.get()->terminate();
+
+  // We drop the first re-registration message to emulate
+  // a half-open connection closing for "old" executors
+  // that do not have the fix for MESOS-7057.
+  Future<ReregisterExecutorMessage> reregisterExecutorMessage =
+    DROP_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  // Restart the slave (use same flags) with a new containerizer.
+  _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  flags.executor_reregistration_timeout = Seconds(5);
+  flags.executor_reregistration_retry_interval = Seconds(1);
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  // The first attempt by the executor to re-register is dropped
+  // so that the agent will retry the reconnect.
+  AWAIT_READY(reregisterExecutorMessage);
+
+  // Now trigger the retry and let the second response through.
+  reregisterExecutorMessage =
+    FUTURE_PROTOBUF(ReregisterExecutorMessage(), _, _);
+
+  Clock::advance(flags.executor_reregistration_retry_interval.get());
+
+  AWAIT_READY(reregisterExecutorMessage);
+
+  // Now ensure that further retries do not occur, since the
+  // executor is already re-registered.
+  EXPECT_NO_FUTURE_PROTOBUFS(ReregisterExecutorMessage(), _, _);
+
+  Clock::advance(flags.executor_reregistration_retry_interval.get());
+  Clock::settle();
+
+  // We have to resume the clock to ensure that the containerizer
+  // can reap the executor pid (i.e. the reaper requires a resumed
+  // clock).
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+}
+
+
 // The slave is stopped before the HTTP based command executor is
 // registered. When it comes back up with recovery=reconnect, make
 // sure the executor is killed and the task is transitioned to LOST.


[3/3] mesos git commit: Added a test for ignoring executor re-registrations.

Posted by bm...@apache.org.
Added a test for ignoring executor re-registrations.

When the executor reconnect retry is enabled, the agent will ignore
any subsequent executor re-registrations since the agent cannot
correctly handle these in the steady state case.

Review: https://reviews.apache.org/r/59587


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e45dd39c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e45dd39c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e45dd39c

Branch: refs/heads/master
Commit: e45dd39c759c3f0535cbcb5b2473d4bb9542d8ee
Parents: 6a97007
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu May 25 14:40:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri May 26 16:56:54 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 106 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e45dd39c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 52611de..927b9c3 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -7104,6 +7104,112 @@ TEST_F(SlaveTest, ShutdownV0ExecutorIfItReregistersWithoutReconnect)
   driver.join();
 }
 
+
+// This ensures that if the executor reconnect retry is enabled,
+// re-registrations from PID-based V0 executors are ignored when
+// already (re-)registered.
+//
+// TODO(bmahler): It should be simpler to write a test that
+// follows a standard recipe (e.g. bring up a mock executor).
+TEST_F(SlaveTest, IgnoreV0ExecutorIfItReregistersWithoutReconnect)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  agentFlags.executor_reregistration_timeout = Seconds(2);
+  agentFlags.executor_reregistration_retry_interval = Seconds(1);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, agentFlags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Advance the clock to trigger both agent registration and a batch
+  // allocation.
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the agent and executor PIDs.
+  Future<Message> registerExecutorMessage =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+  task.mutable_resources()->MergeFrom(offers->at(0).resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(registerExecutorMessage);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  // Now spoof an executor re-registration, it should be ignored
+  // and the agent should not respond.
+  EXPECT_NO_FUTURE_PROTOBUFS(ExecutorReregisteredMessage(), _, _);
+
+  Future<Nothing> executorShutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&executorShutdown));
+
+  UPID executorPid = registerExecutorMessage->from;
+  UPID agentPid = registerExecutorMessage->to;
+
+  ReregisterExecutorMessage reregisterExecutorMessage;
+  reregisterExecutorMessage.mutable_executor_id()->CopyFrom(
+      task.executor().executor_id());
+  reregisterExecutorMessage.mutable_framework_id()->CopyFrom(
+      frameworkId);
+
+  process::post(executorPid, agentPid, reregisterExecutorMessage);
+
+  Clock::settle();
+  EXPECT_TRUE(executorShutdown.isPending());
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/3] mesos git commit: Added a test for shutting down executors that re-register.

Posted by bm...@apache.org.
Added a test for shutting down executors that re-register.

When already (re-)registered executors attempt to re-register,
and the agent does not have the optional reconnect retry enabled,
the agent will shut down the executor.

Review: https://reviews.apache.org/r/59586


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a970072
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a970072
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a970072

Branch: refs/heads/master
Commit: 6a970072e91f7574511c50434baea39734f71dbc
Parents: 9478073
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu May 25 14:38:12 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Fri May 26 16:56:53 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 100 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6a970072/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 68ed8e9..52611de 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -7004,6 +7004,106 @@ TEST_F(SlaveTest, MaxCompletedExecutorsPerFrameworkFlag)
   }
 }
 
+
+// This ensures that if the executor reconnect retry is disabled,
+// PID-based V0 executors are disallowed from re-registering in
+// the steady state.
+//
+// TODO(bmahler): It should be simpler to write a test that
+// follows a standard recipe (e.g. bring up a mock executor).
+TEST_F(SlaveTest, ShutdownV0ExecutorIfItReregistersWithoutReconnect)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, agentFlags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  // Advance the clock to trigger both agent registration and a batch
+  // allocation.
+  Clock::advance(agentFlags.registration_backoff_factor);
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the agent and executor PIDs.
+  Future<Message> registerExecutorMessage =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  TaskInfo task;
+  task.set_name("test-task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+  task.mutable_resources()->MergeFrom(offers->at(0).resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  driver.launchTasks(offers->at(0).id(), {task});
+
+  AWAIT_READY(registerExecutorMessage);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status->state());
+
+  // Now spoof an executor re-registration, the executor
+  // should be shut down.
+  Future<Nothing> executorShutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&executorShutdown));
+
+  UPID executorPid = registerExecutorMessage->from;
+  UPID agentPid = registerExecutorMessage->to;
+
+  ReregisterExecutorMessage reregisterExecutorMessage;
+  reregisterExecutorMessage.mutable_executor_id()->CopyFrom(
+      task.executor().executor_id());
+  reregisterExecutorMessage.mutable_framework_id()->CopyFrom(
+      frameworkId);
+
+  process::post(executorPid, agentPid, reregisterExecutorMessage);
+
+  AWAIT_READY(executorShutdown);
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {