You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2016/08/23 16:42:42 UTC
mesos git commit: Added test to simulate slow/unresponsive fetch.
Repository: mesos
Updated Branches:
refs/heads/master 6b1aa0084 -> a064505e4
Added test to simulate slow/unresponsive fetch.
Added test to simulate the scenario of slow/unresponsive HDFS leading
to executor register timeout and verify that slave gets notified of the
failure.
Review: https://reviews.apache.org/r/50000/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a064505e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a064505e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a064505e
Branch: refs/heads/master
Commit: a064505e411fe78a257e9b336a888f1eeddaa949
Parents: 6b1aa00
Author: Megha Sharma <ms...@apple.com>
Authored: Mon Aug 22 14:51:07 2016 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Tue Aug 23 09:42:31 2016 -0700
----------------------------------------------------------------------
src/tests/slave_tests.cpp | 129 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 129 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a064505e/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 30ca3da..dcf8454 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -263,6 +263,135 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
}
+// This test verifies that mesos agent gets notified of task
+// launch failure triggered by the executor register timeout
+// caused by slow URI fetching.
+TEST_F(SlaveTest, ExecutorTimeoutCausedBySlowFetch)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ string hadoopPath = os::getcwd();
+ string hadoopBinPath = path::join(hadoopPath, "bin");
+
+ ASSERT_SOME(os::mkdir(hadoopBinPath));
+ ASSERT_SOME(os::chmod(hadoopBinPath, S_IRWXU | S_IRWXG | S_IRWXO));
+
+ // A spurious "hadoop" script that sleeps forever.
+ string mockHadoopScript = "#!/usr/bin/env bash\n"
+ "sleep 1000";
+
+ string hadoopCommand = path::join(hadoopBinPath, "hadoop");
+ ASSERT_SOME(os::write(hadoopCommand, mockHadoopScript));
+ ASSERT_SOME(os::chmod(hadoopCommand,
+ S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH));
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.hadoop_home = hadoopPath;
+
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> _containerizer = MesosContainerizer::create(
+ flags, true, &fetcher);
+
+ CHECK_SOME(_containerizer);
+ Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(
+ detector.get(),
+ containerizer.get(),
+ flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched,
+ DEFAULT_FRAMEWORK_INFO,
+ master.get()->pid,
+ DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(1);
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // Launch a task with the command executor.
+ // The task uses a URI that needs to be fetched by the HDFS client
+ // and will be blocked until the executor registrartion times out.
+ CommandInfo commandInfo;
+ CommandInfo::URI* uri = commandInfo.add_uris();
+ uri->set_value(path::join("hdfs://dummyhost/dummypath", "test"));
+
+ // Using a dummy command value as it's a required field. The
+ // command won't be invoked.
+ commandInfo.set_value("sleep 10");
+
+ ExecutorID executorId;
+ executorId.set_value("test-executor-staging");
+
+ TaskInfo task = createTask(
+ offers.get()[0].slave_id(),
+ offers.get()[0].resources(),
+ commandInfo,
+ executorId,
+ "test-task-staging");
+
+ Future<Nothing> fetch = FUTURE_DISPATCH(
+ _, &FetcherProcess::fetch);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Clock::pause();
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ Future<Nothing> executorLost;
+ EXPECT_CALL(sched, executorLost(&driver, executorId, _, _))
+ .WillOnce(FutureSatisfy(&executorLost));
+
+ // Ensure that the slave times out and kills the executor.
+ Future<Nothing> destroyExecutor = FUTURE_DISPATCH(
+ _, &MesosContainerizerProcess::destroy);
+
+ AWAIT_READY(fetch);
+
+ Clock::advance(flags.executor_registration_timeout);
+
+ AWAIT_READY(destroyExecutor);
+
+ Clock::settle(); // Wait for Containerizer::destroy to complete.
+
+ // Now advance time until the reaper reaps the executor.
+ while (status.isPending()) {
+ Clock::advance(process::MAX_REAP_INTERVAL());
+ Clock::settle();
+ }
+
+ AWAIT_READY(executorLost);
+
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_FAILED, status->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+ EXPECT_EQ(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED, status->reason());
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+}
+
+
// This test verifies that when an executor terminates before
// registering with slave, it is properly cleaned up.
TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)