You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2017/10/19 23:34:23 UTC
[02/11] mesos git commit: Added basic `network/ports` isolator tests.
Added basic `network/ports` isolator tests.
Added tests to verify that the `network/ports` is able to correctly
terminate only tasks that use rogue TCP ports.
Review: https://reviews.apache.org/r/60765/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/48ac8727
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/48ac8727
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/48ac8727
Branch: refs/heads/master
Commit: 48ac8727e65d2bfe7b1421614442f3e0431b2262
Parents: b5e6f3a
Author: James Peach <jp...@apache.org>
Authored: Thu Oct 19 15:36:10 2017 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu Oct 19 16:33:35 2017 -0700
----------------------------------------------------------------------
.../containerizer/ports_isolator_tests.cpp | 718 ++++++++++++++++++-
1 file changed, 715 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/48ac8727/src/tests/containerizer/ports_isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/ports_isolator_tests.cpp b/src/tests/containerizer/ports_isolator_tests.cpp
index 016f9cc..51357e4 100644
--- a/src/tests/containerizer/ports_isolator_tests.cpp
+++ b/src/tests/containerizer/ports_isolator_tests.cpp
@@ -15,22 +15,33 @@
// limitations under the License.
#include <string>
+#include <type_traits>
+#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/process.hpp>
#include <stout/gtest.hpp>
+#include "master/detector/standalone.hpp"
+
+#include "slave/constants.hpp"
+
#include "slave/containerizer/mesos/isolators/network/ports.hpp"
#include "tests/mesos.hpp"
+using mesos::internal::slave::NetworkPortsIsolatorProcess;
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+using mesos::master::detector::StandaloneMasterDetector;
+
+using process::Clock;
using process::Future;
using process::Owned;
-using mesos::internal::slave::NetworkPortsIsolatorProcess;
-
using std::string;
using std::vector;
@@ -40,7 +51,99 @@ namespace mesos {
namespace internal {
namespace tests {
-class NetworkPortsIsolatorTest : public MesosTest {};
+class NetworkPortsIsolatorTest : public MesosTest
+{
+public:
+ virtual void SetUp()
+ {
+ MesosTest::SetUp();
+
+ std::srand(std::time(0));
+ }
+
+ // Wait until a status update is received and subsequently acknowledged.
+ // If we don't wait for the acknowledgement, then advancing the clock can
+ // cause the agent to time out on receiving the acknowledgement, at which
+ // point it will re-send and the test will intercept an unexpected duplicate
+ // status update.
+ void awaitStatusUpdateAcked(Future<TaskStatus>& status)
+ {
+ Future<Nothing> ack =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ AWAIT_READY(status);
+ AWAIT_READY(ack);
+ }
+
+ // Expect that the TaskStatus is reporting a successful health check.
+ void expectHealthyStatus(const TaskStatus& status)
+ {
+ EXPECT_EQ(TASK_RUNNING, status.state());
+
+ EXPECT_EQ(
+ TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED,
+ status.reason());
+
+ ASSERT_TRUE(status.has_healthy());
+ EXPECT_TRUE(status.healthy());
+ }
+
+ // Expect that the TaskStatus is a container limitation that tells us
+ // about a single ports resource.
+ void expectPortsLimitation(
+ const TaskStatus& status,
+ const Option<uint64_t>& port = None())
+ {
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_LIMITATION, status.reason());
+
+ ASSERT_TRUE(status.has_limitation()) << JSON::protobuf(status);
+
+ Resources limit = Resources(status.limitation().resources());
+
+ EXPECT_EQ(1u, limit.size());
+ ASSERT_SOME(limit.ports());
+
+ if (port.isSome()) {
+ ASSERT_EQ(1, limit.ports()->range().size());
+ EXPECT_EQ(port.get(), limit.ports()->range(0).begin());
+ EXPECT_EQ(port.get(), limit.ports()->range(0).end());
+ }
+ }
+};
+
+
+// Select a random port from an offer.
+static uint16_t selectRandomPort(const Resources& resources)
+{
+ Value::Range ports = resources.ports()->range(0);
+ return ports.begin() + std::rand() % (ports.end() - ports.begin() + 1);
+}
+
+
+// Select a random port that is not the same as the one given.
+static uint16_t selectOtherPort(const Resources& resources, uint16_t port)
+{
+ uint16_t selected;
+
+ do {
+ selected = selectRandomPort(resources);
+ } while (selected == port);
+
+ return selected;
+}
+
+
+template <typename T>
+static void addTcpHealthCheck(T& taskInfo, uint16_t port)
+{
+ auto* checkInfo = taskInfo.mutable_health_check();
+
+ checkInfo->set_type(std::remove_pointer<decltype(checkInfo)>::type::TCP);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_grace_period_seconds(10);
+ checkInfo->set_interval_seconds(1);
+ checkInfo->mutable_tcp()->set_port(port);
+}
// This test verifies that we can correctly detect sockets that
@@ -100,6 +203,615 @@ TEST(NetworkPortsIsolatorUtilityTest, QueryProcessSockets)
<< processAddress;
}
+
+// This test verifies that the `network/ports` isolator throws
+// an error unless the `linux` launcher is being used.
+TEST_F(NetworkPortsIsolatorTest, ROOT_IsolatorFlags)
+{
+ StandaloneMasterDetector detector;
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+
+ Try<Owned<cluster::Slave>> slave = Owned<cluster::Slave>();
+
+ flags.launcher = "posix";
+ slave = StartSlave(&detector, flags);
+ ASSERT_ERROR(slave);
+
+ flags.launcher = "linux";
+ slave = StartSlave(&detector, flags);
+ ASSERT_SOME(slave);
+}
+
+
+// libprocess always listens on a port when it is initialized
+// with no control over whether task resources are allocated
+// for that port. This test verifies that a task that uses the
+// command executor will always be killed due to the libprocess
+// port even when it doesn't open any ports itself.
+TEST_F(NetworkPortsIsolatorTest, ROOT_CommandExecutorPorts)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+ flags.launcher = "linux";
+ flags.check_agent_port_range_only = false;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:32").get(),
+ "sleep 10000");
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ Future<TaskStatus> failedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&failedStatus));
+
+ // Even though the task itself never listened on any ports, we expect
+ // that it gets killed because the isolator detects the libprocess
+ // port the command executor is listening on.
+ AWAIT_READY(failedStatus);
+ EXPECT_EQ(task.task_id(), failedStatus->task_id());
+ EXPECT_EQ(TASK_FAILED, failedStatus->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
+ expectPortsLimitation(failedStatus.get());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies that a task that correctly listens on
+// ports for which it holds resources is allowed to run and it
+// not killed by the `network/ports` isolator.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_AllocatedPorts)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+ flags.launcher = "linux";
+
+ // Watch only the agent ports resources range because we want this
+ // test to trigger on the nc command, not on the command executor.
+ flags.check_agent_port_range_only = true;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ Resources resources(offers.get()[0].resources());
+
+ // Make sure we have a `ports` resource.
+ ASSERT_SOME(resources.ports());
+ ASSERT_LE(1, resources.ports()->range().size());
+
+ uint16_t taskPort = selectRandomPort(resources);
+
+ resources = Resources::parse(
+ "cpus:1;mem:32;"
+ "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+ // Use "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(taskPort));
+
+ addTcpHealthCheck(task, taskPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ Future<TaskStatus> killedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&killedStatus));
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(killedStatus);
+ EXPECT_EQ(task.task_id(), killedStatus->task_id());
+ EXPECT_EQ(TASK_KILLED, killedStatus->state());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies that if the agent has an empty ports
+// resource, and the check_agent_port_range_only flag is enabled,
+// a task using an arbitrary port is allowed to start up and
+// become healthy. This is correct because it effectively reduces
+// the set of ports we are protecting to zero.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_NoPortsResource)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+ flags.launcher = "linux";
+
+ // Omit the default ports from the agent resources.
+ flags.resources = R"(
+ [
+ {
+ "name": "cpus",
+ "type": "SCALAR",
+ "scalar": {
+ "value": 2
+ }
+ },
+ {
+ "name": "gpus",
+ "type": "SCALAR",
+ "scalar": {
+ "value": 0
+ }
+ },
+ {
+ "name": "mem",
+ "type": "SCALAR",
+ "scalar": {
+ "value": 1024
+ }
+ },
+ {
+ "name": "disk",
+ "type": "SCALAR",
+ "scalar": {
+ "value": 1024
+ }
+ },
+ {
+ "name": "ports",
+ "type": "RANGES",
+ "ranges": {
+ }
+ }
+ ]
+ )";
+
+ // Watch only the agent ports resources range because we want this
+ // test to trigger on the nc command, not on the command executor.
+ flags.check_agent_port_range_only = true;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ Resources resources(offers.get()[0].resources());
+
+ // Make sure we do not have a `ports` resource in the offer.
+ ASSERT_NONE(resources.ports());
+
+ // Select a random task port from the default range.
+ resources = Resources::parse(
+ "ports",
+ stringify(slave::DEFAULT_PORTS),
+ flags.default_role).get();
+
+ uint16_t taskPort = selectRandomPort(resources);
+
+ resources = Resources::parse("cpus:1;mem:32").get();
+
+ // Use "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(taskPort));
+
+ addTcpHealthCheck(task, taskPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ Future<TaskStatus> killedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&killedStatus));
+
+ driver.killTask(task.task_id());
+
+ AWAIT_READY(killedStatus);
+ EXPECT_EQ(task.task_id(), killedStatus->task_id());
+ EXPECT_EQ(TASK_KILLED, killedStatus->state());
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies that the isolator correctly defaults the agent ports
+// resource when the operator doesn't specify any ports. We verify that a
+// task that uses an unallocated port in the agent's offer is detected and
+// killed by a container limitation.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_DefaultPortsResource)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+ flags.launcher = "linux";
+
+ // Clear resources set by CreateSlaveFlags() to force the agent and
+ // isolator to apply built-in defaults.
+ flags.resources = None();
+
+ // Watch only the agent ports resources range because we want this
+ // test to trigger on the nc command, not on the command executor.
+ flags.check_agent_port_range_only = true;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ // Make sure we have a `ports` resource.
+ Resources resources(offer.resources());
+ ASSERT_SOME(resources.ports());
+ ASSERT_LE(1, resources.ports()->range().size());
+
+ uint16_t taskPort = selectRandomPort(resources);
+ uint16_t usedPort = selectOtherPort(resources, taskPort);
+
+ resources = Resources::parse(
+ "cpus:1;mem:32;"
+ "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+ // Use "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(usedPort));
+
+ addTcpHealthCheck(task, usedPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ Future<TaskStatus> failedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&failedStatus));
+
+ // We expect that the task will get killed by the isolator.
+ AWAIT_READY(failedStatus);
+ EXPECT_EQ(task.task_id(), failedStatus->task_id());
+ EXPECT_EQ(TASK_FAILED, failedStatus->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
+ expectPortsLimitation(failedStatus.get(), usedPort);
+
+ driver.stop();
+ driver.join();
+}
+
+
+// This test verifies that a task that listens on a port for which it has
+// no resources is detected and killed by a container limitation.
+TEST_F(NetworkPortsIsolatorTest, ROOT_NC_UnallocatedPorts)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "network/ports";
+ flags.launcher = "linux";
+
+ // Watch only the agent ports resources range because we want this
+ // test to trigger on the nc command, not on the command executor.
+ flags.check_agent_port_range_only = true;
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ const Offer& offer = offers.get()[0];
+
+ // Make sure we have a `ports` resource.
+ Resources resources(offer.resources());
+ ASSERT_SOME(resources.ports());
+ ASSERT_LE(1, resources.ports()->range().size());
+
+ uint16_t taskPort = selectRandomPort(resources);
+ uint16_t usedPort = selectOtherPort(resources, taskPort);
+
+ resources = Resources::parse(
+ "cpus:1;mem:32;"
+ "ports:[" + stringify(taskPort) + "," + stringify(taskPort) + "]").get();
+
+ // Launch a task that uses a port that it hasn't been allocated. Use
+ // "nc -k" so nc keeps running after accepting the healthcheck connection.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ resources,
+ "nc -k -l " + stringify(usedPort));
+
+ addTcpHealthCheck(task, usedPort);
+
+ Future<TaskStatus> startingStatus;
+ Future<TaskStatus> runningStatus;
+ Future<TaskStatus> healthStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillOnce(FutureArg<1>(&healthStatus));
+
+ driver.launchTasks(offer.id(), {task});
+
+ awaitStatusUpdateAcked(startingStatus);
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+
+ awaitStatusUpdateAcked(runningStatus);
+ EXPECT_EQ(task.task_id(), runningStatus->task_id());
+ EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+
+ awaitStatusUpdateAcked(healthStatus);
+ ASSERT_EQ(task.task_id(), healthStatus->task_id());
+ expectHealthyStatus(healthStatus.get());
+
+ Future<Nothing> check =
+ FUTURE_DISPATCH(_, &NetworkPortsIsolatorProcess::check);
+
+ Clock::pause();
+ Clock::advance(flags.container_ports_watch_interval);
+
+ AWAIT_READY(check);
+
+ Clock::settle();
+ Clock::resume();
+
+ Future<TaskStatus> failedStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&failedStatus));
+
+ // We expect that the task will get killed by the isolator.
+ AWAIT_READY(failedStatus);
+ EXPECT_EQ(task.task_id(), failedStatus->task_id());
+ EXPECT_EQ(TASK_FAILED, failedStatus->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, failedStatus->source());
+ expectPortsLimitation(failedStatus.get(), usedPort);
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {