You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2017/10/19 23:34:31 UTC
[10/11] mesos git commit: Added `network/ports` isolator recovery
tests.
Added `network/ports` isolator recovery tests.
Added a test for the `network/ports` isolator recovery by starting
a task that is listening on a rogue port. We only configure the
isolator when we restart the agent to simulate the case where a
task only starts to misbehave after an agent recovery.
Review: https://reviews.apache.org/r/60593/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/91a3e243
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/91a3e243
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/91a3e243
Branch: refs/heads/master
Commit: 91a3e24392c71a74b23c6d288c2b0d7b90b9bfce
Parents: 48ac872
Author: James Peach <jp...@apache.org>
Authored: Thu Oct 19 15:36:14 2017 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu Oct 19 16:33:35 2017 -0700
----------------------------------------------------------------------
.../containerizer/ports_isolator_tests.cpp | 266 +++++++++++++++++++
1 file changed, 266 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/91a3e243/src/tests/containerizer/ports_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/ports_isolator_tests.cpp b/src/tests/containerizer/ports_isolator_tests.cpp
index 51357e4..41c800f 100644
--- a/src/tests/containerizer/ports_isolator_tests.cpp
+++ b/src/tests/containerizer/ports_isolator_tests.cpp
@@ -27,6 +27,7 @@
#include "master/detector/standalone.hpp"
#include "slave/constants.hpp"
+#include "slave/slave.hpp"
#include "slave/containerizer/mesos/isolators/network/ports.hpp"
@@ -812,6 +813,271 @@ TEST_F(NetworkPortsIsolatorTest, ROOT_NC_UnallocatedPorts)
driver.join();
}
+
+// Test that after we recover a task, the isolator notices that it
+// is using the wrong ports and kills it.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverBadTask)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Start the agent without any `network/ports` isolation.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.launcher = "linux";
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ frameworkInfo,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ Resources resources(offers.get()[0].resources());
+
+ // Make sure we have a `ports` resource.
+ ASSERT_SOME(resources.ports());
+ ASSERT_LE(1, resources.ports()->range().size());
+
+ uint16_t taskPort = selectRandomPort(resources);
+ uint16_t usedPort = selectOtherPort(resources, taskPort);
+
+ resources = Resources::parse(
+ "cpus:1;mem:32;"
+ "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+ // Launch a task that uses a port that it hasn't been allocated. Use
+ // "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(usedPort));
+
+ addTcpHealthCheck(task, usedPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ // Restart the agent.
+ slave.get()->terminate();
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Add `network/ports` isolation to the restarted agent. This tests that when
+ // the isolator goes through recovery we will notice the nc command listening
+ // and terminate it.
+ flags.isolation = "network/ports";
+ flags.check_agent_port_range_only = true;
+
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Now force a ports check, which should terminate the nc command.
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Future<TaskStatus> failedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&failedStatus));
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ // We expect that the task will get killed by the isolator.
+ AWAIT_READY(failedStatus);
+ EXPECT_EQ(task.task_id(), failedStatus->task_id());
+ EXPECT_EQ(TASK_FAILED, failedStatus->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
+ expectPortsLimitation(failedStatus.get(), usedPort);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// Test that the isolator doesn't kill well-behaved tasks on recovery.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_RecoverGoodTask)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.launcher = "linux";
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ frameworkInfo,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ Resources resources(offers.get()[0].resources());
+
+ // Make sure we have a `ports` resource.
+ ASSERT_SOME(resources.ports());
+ ASSERT_LE(1, resources.ports()->range().size());
+
+ uint16_t taskPort = selectRandomPort(resources);
+
+ resources = Resources::parse(
+ "cpus:1;mem:32;"
+ "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+ // Use "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(taskPort));
+
+ addTcpHealthCheck(task, taskPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ // Restart the agent.
+ slave.get()->terminate();
+
+ // Add `network/ports` isolation to the restarted agent. This tests that
+ // when the isolator goes through recovery we will notice the nc command
+ // listening and will let it continue running.
+ flags.isolation = "network/ports";
+ flags.check_agent_port_range_only = true;
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // We should not get any status updates because the task should
+ // stay running. We wait for a check to run and settle any
+ // messages that result from that to ensure we don't miss any
+ // triggered limitations.
+ EXPECT_CALL(sched, statusUpdate(&driver, _)).Times(0);
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ // Since the task is still running, we should be able to kill it
+ // and receive the expected status update state.
+ Future<TaskStatus> killedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&killedStatus))
+ .RetiresOnSaturation();
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(killedStatus);
+ EXPECT_EQ(task.task_id(), killedStatus->task_id());
+ EXPECT_EQ(TASK_KILLED, killedStatus->state());
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {