You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/05 23:42:48 UTC
mesos git commit: Fixed a bug in PosixDiskIsolator during slave
recovery.
Repository: mesos
Updated Branches:
refs/heads/master dda6cb29f -> ceb136921
Fixed a bug in PosixDiskIsolator during slave recovery.
Review: https://reviews.apache.org/r/31772
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ceb13692
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ceb13692
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ceb13692
Branch: refs/heads/master
Commit: ceb136921df8317c163dfd32ba4c5e480d7f9738
Parents: dda6cb2
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Mar 5 12:13:37 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Mar 5 14:42:18 2015 -0800
----------------------------------------------------------------------
src/slave/containerizer/mesos/containerizer.cpp | 14 +-
src/slave/state.cpp | 7 -
src/slave/state.hpp | 5 -
src/tests/disk_quota_tests.cpp | 161 +++++++++++++++++--
4 files changed, 165 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index ec4626f..fbd1c0a 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -354,10 +354,22 @@ Future<Nothing> MesosContainerizerProcess::recover(
<< "' for executor '" << executor.id
<< "' of framework " << framework.id;
+ // NOTE: We create the executor directory before checkpointing
+ // the executor. Therefore, it's not possible for this
+ // directory to be non-existent.
+ const string& directory = paths::getExecutorRunPath(
+ flags.work_dir,
+ state.get().id,
+ framework.id,
+ executor.id,
+ containerId);
+
+ CHECK(os::exists(directory));
+
ExecutorRunState executorRunState(
run.get().id.get(),
run.get().forkedPid.get(),
- run.get().directory);
+ directory);
recoverable.push_back(executorRunState);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 41f6c2c..35ce70b 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -399,13 +399,6 @@ Try<RunState> RunState::recover(
state.id = containerId;
string message;
- state.directory = paths::getExecutorRunPath(
- rootDir,
- slaveId,
- frameworkId,
- executorId,
- containerId);
-
// See if the sentinel file exists. This is done first so it is
// known even if partial state is returned, e.g., if the libprocess
// pid file is not recovered. It indicates the slave removed the
http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 0201e72..31dfdd5 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -265,11 +265,6 @@ struct RunState
Option<pid_t> forkedPid;
Option<process::UPID> libprocessPid;
- // NOTE: We create the executor directory before checkpointing the
- // executor. Therefore, it's not possible for this directory to be
- // non-existent.
- std::string directory;
-
// Executor terminated and all its updates acknowledged.
bool completed;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/tests/disk_quota_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/disk_quota_tests.cpp b/src/tests/disk_quota_tests.cpp
index 56d82ee..9c3a881 100644
--- a/src/tests/disk_quota_tests.cpp
+++ b/src/tests/disk_quota_tests.cpp
@@ -34,6 +34,7 @@
#include "master/master.hpp"
+#include "slave/constants.hpp"
#include "slave/flags.hpp"
#include "slave/slave.hpp"
@@ -296,9 +297,7 @@ TEST_F(DiskQuotaTest, NoQuotaEnforcement)
break;
}
- if (elapsed > Seconds(5)) {
- FAIL() << "Failed to wait for process to be traced";
- }
+ ASSERT_LT(elapsed, Seconds(5));
os::sleep(Milliseconds(1));
elapsed += Milliseconds(1);
@@ -377,14 +376,158 @@ TEST_F(DiskQuotaTest, ResourceStatistics)
ContainerID containerId = *(containers.get().begin());
- Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
- AWAIT_READY(usage);
+ // Wait until disk usage can be retrieved.
+ Duration elapsed = Duration::zero();
+ while (true) {
+ Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+ EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+ if (usage.get().has_disk_used_bytes()) {
+ EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+ break;
+ }
+
+ ASSERT_LT(elapsed, Seconds(5));
+
+ os::sleep(Milliseconds(1));
+ elapsed += Milliseconds(1);
+ }
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+
+ delete containerizer.get();
+}
+
+
+// This test verifies that disk quota isolator recovers properly after
+// the slave restarts.
+TEST_F(DiskQuotaTest, SlaveRecovery)
+{
+ Try<PID<Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+ flags.isolation = "posix/cpu,posix/mem,posix/disk";
+ flags.container_disk_watch_interval = Milliseconds(1);
+
+ Fetcher fetcher;
+
+ Try<MesosContainerizer*> containerizer =
+ MesosContainerizer::create(flags, true, &fetcher);
+
+ ASSERT_SOME(containerizer);
+
+ Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+
+ // Enable checkpointing for the framework.
+ FrameworkInfo frameworkInfo;
+ frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+ frameworkInfo.set_checkpoint(true);
+
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(_, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ // Create a task that uses 2MB disk.
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ Resources::parse("cpus:1;mem:128;disk:3").get(),
+ "dd if=/dev/zero of=file bs=1048576 count=2 && sleep 1000");
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ driver.launchTasks(offers.get()[0].id(), {task});
+
+ AWAIT_READY(status);
+ EXPECT_EQ(task.task_id(), status.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+ AWAIT_READY(containers);
+ ASSERT_EQ(1u, containers.get().size());
+
+ ContainerID containerId = *(containers.get().begin());
+
+ // Stop the slave.
+ Stop(slave.get());
+ delete containerizer.get();
+
+ Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ containerizer = MesosContainerizer::create(flags, true, &fetcher);
+ ASSERT_SOME(containerizer);
+
+ slave = StartSlave(containerizer.get(), flags);
+ ASSERT_SOME(slave);
+
+ Clock::pause();
+
+ AWAIT_READY(_recover);
+
+ // Wait for slave to schedule reregister timeout.
+ Clock::settle();
+
+ // Ensure the slave considers itself recovered.
+ Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT);
+
+ // NOTE: We resume the clock because we need the reaper to reap the
+ // 'du' subprocess.
+ Clock::resume();
- ASSERT_TRUE(usage.get().has_disk_limit_bytes());
- EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+ // Wait for the slave to re-register.
+ AWAIT_READY(slaveReregisteredMessage);
- if (usage.get().has_disk_used_bytes()) {
- EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+ // Wait until disk usage can be retrieved.
+ Duration elapsed = Duration::zero();
+ while (true) {
+ Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+ AWAIT_READY(usage);
+
+ ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+ EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+ if (usage.get().has_disk_used_bytes()) {
+ EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+
+ // NOTE: This is to capture the regression in MESOS-2452. The data
+ // stored in the executor meta directory should be less than 64K.
+ EXPECT_GT(usage.get().disk_used_bytes(), Kilobytes(64).bytes());
+ break;
+ }
+
+ ASSERT_LT(elapsed, Seconds(5));
+
+ os::sleep(Milliseconds(1));
+ elapsed += Milliseconds(1);
}
driver.stop();