You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2017/10/19 23:34:31 UTC

[10/11] mesos git commit: Added `network/ports` isolator recovery tests.

Added `network/ports` isolator recovery tests.

Added a test for the `network/ports` isolator recovery by starting
a task that is listening on a rogue port. We only configure the
isolator when we restart the agent to simulate the case where a
task only starts to misbehave after an agent recovery.

Review: https://reviews.apache.org/r/60593/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/91a3e243
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/91a3e243
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/91a3e243

Branch: refs/heads/master
Commit: 91a3e24392c71a74b23c6d288c2b0d7b90b9bfce
Parents: 48ac872
Author: James Peach <jp...@apache.org>
Authored: Thu Oct 19 15:36:14 2017 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu Oct 19 16:33:35 2017 -0700

----------------------------------------------------------------------
 .../containerizer/ports_isolator_tests.cpp      | 266 +++++++++++++++++++
 1 file changed, 266 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/91a3e243/src/tests/containerizer/ports_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/ports_isolator_tests.cpp b/src/tests/containerizer/ports_isolator_tests.cpp
index 51357e4..41c800f 100644
--- a/src/tests/containerizer/ports_isolator_tests.cpp
+++ b/src/tests/containerizer/ports_isolator_tests.cpp
@@ -27,6 +27,7 @@
 #include "master/detector/standalone.hpp"
 
 #include "slave/constants.hpp"
+#include "slave/slave.hpp"
 
 #include "slave/containerizer/mesos/isolators/network/ports.hpp"
 
@@ -812,6 +813,271 @@ TEST_F(NetworkPortsIsolatorTest, ROOT_NC_UnallocatedPorts)
   driver.join();
 }
 
+
+// Test that after we recover a task, the isolator notices that it
+// is using the wrong ports and kills it.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverBadTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Start the agent without any `network/ports` isolation.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.launcher = "linux";
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched,
+      frameworkInfo,
+      master.get()->pid,
+      DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  const Offer& offer = offers.get()[0];
+
+  Resources resources(offers.get()[0].resources());
+
+  // Make sure we have a `ports` resource.
+  ASSERT_SOME(resources.ports());
+  ASSERT_LE(1, resources.ports()->range().size());
+
+  uint16_t taskPort = selectRandomPort(resources);
+  uint16_t usedPort = selectOtherPort(resources, taskPort);
+
+  resources = Resources::parse(
+      "cpus:1;mem:32;"
+      "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+  // Launch a task that uses a port that it hasn't been allocated.  Use
+  // "nc -k" so nc keeps running after accepting the healthcheck connection.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      resources,
+      "nc -k -l " + stringify(usedPort));
+
+  addTcpHealthCheck(task, usedPort);
+
+  Future<TaskStatus> startingStatus;
+  Future<TaskStatus> runningStatus;
+  Future<TaskStatus> healthStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
+    .WillOnce(FutureArg<1>(&runningStatus))
+    .WillOnce(FutureArg<1>(&healthStatus));
+
+  driver.launchTasks(offer.id(), {task});
+
+  awaitStatusUpdateAcked(startingStatus);
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+  awaitStatusUpdateAcked(runningStatus);
+  EXPECT_EQ(task.task_id(), runningStatus->task_id());
+  EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+  awaitStatusUpdateAcked(healthStatus);
+  ASSERT_EQ(task.task_id(), healthStatus->task_id());
+  expectHealthyStatus(healthStatus.get());
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Add `network/ports` isolation to the restarted agent. This tests that when
+  // the isolator goes through recovery we will notice the nc command listening
+  // and terminate it.
+  flags.isolation = "network/ports";
+  flags.check_agent_port_range_only = true;
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Now force a ports check, which should terminate the nc command.
+  Future<Nothing> check =
+    FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+  Future<TaskStatus> failedStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&failedStatus));
+
+  Clock::pause();
+  Clock::advance(flags.container_ports_watch_interval);
+
+  AWAIT_READY(check);
+
+  Clock::settle();
+  Clock::resume();
+
+  // We expect that the task will get killed by the isolator.
+  AWAIT_READY(failedStatus);
+  EXPECT_EQ(task.task_id(), failedStatus->task_id());
+  EXPECT_EQ(TASK_FAILED, failedStatus->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
+  expectPortsLimitation(failedStatus.get(), usedPort);
+
+  driver.stop();
+  driver.join();
+}
+
+
+// Test that the isolator doesn't kill well-behaved tasks on recovery.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverGoodTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.launcher = "linux";
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+
+  MesosSchedulerDriver driver(
+      &sched,
+      frameworkInfo,
+      master.get()->pid,
+      DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  const Offer& offer = offers.get()[0];
+
+  Resources resources(offers.get()[0].resources());
+
+  // Make sure we have a `ports` resource.
+  ASSERT_SOME(resources.ports());
+  ASSERT_LE(1, resources.ports()->range().size());
+
+  uint16_t taskPort = selectRandomPort(resources);
+
+  resources = Resources::parse(
+      "cpus:1;mem:32;"
+      "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+  // Use "nc -k" so nc keeps running after accepting the healthcheck connection.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      resources,
+      "nc -k -l " + stringify(taskPort));
+
+  addTcpHealthCheck(task, taskPort);
+
+  Future<TaskStatus> startingStatus;
+  Future<TaskStatus> runningStatus;
+  Future<TaskStatus> healthStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
+    .WillOnce(FutureArg<1>(&runningStatus))
+    .WillOnce(FutureArg<1>(&healthStatus));
+
+  driver.launchTasks(offer.id(), {task});
+
+  awaitStatusUpdateAcked(startingStatus);
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+  awaitStatusUpdateAcked(runningStatus);
+  EXPECT_EQ(task.task_id(), runningStatus->task_id());
+  EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+  awaitStatusUpdateAcked(healthStatus);
+  ASSERT_EQ(task.task_id(), healthStatus->task_id());
+  expectHealthyStatus(healthStatus.get());
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  // Add `network/ports` isolation to the restarted agent. This tests that
+  // when the isolator goes through recovery we will notice the nc command
+  // listening and will let it continue running.
+  flags.isolation = "network/ports";
+  flags.check_agent_port_range_only = true;
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // We should not get any status updates because the task should
+  // stay running. We wait for a check to run and settle any
+  // messages that result from that to ensure we don't miss any
+  // triggered limitations.
+  EXPECT_CALL(sched, statusUpdate(&driver, _)).Times(0);
+
+  Future<Nothing> check =
+    FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+  Clock::pause();
+  Clock::advance(flags.container_ports_watch_interval);
+
+  AWAIT_READY(check);
+
+  Clock::settle();
+  Clock::resume();
+
+  // Since the task is still running, we should be able to kill it
+  // and receive the expected status update state.
+  Future<TaskStatus> killedStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&killedStatus))
+    .RetiresOnSaturation();
+
+  driver.killTask(task.task_id());
+
+  AWAIT_READY(killedStatus);
+  EXPECT_EQ(task.task_id(), killedStatus->task_id());
+  EXPECT_EQ(TASK_KILLED, killedStatus->state());
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {