You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/03/05 23:42:48 UTC

mesos git commit: Fixed a bug in PosixDiskIsolator during slave recovery.

Repository: mesos
Updated Branches:
  refs/heads/master dda6cb29f -> ceb136921


Fixed a bug in PosixDiskIsolator during slave recovery.

Review: https://reviews.apache.org/r/31772


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ceb13692
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ceb13692
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ceb13692

Branch: refs/heads/master
Commit: ceb136921df8317c163dfd32ba4c5e480d7f9738
Parents: dda6cb2
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Mar 5 12:13:37 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Mar 5 14:42:18 2015 -0800

----------------------------------------------------------------------
 src/slave/containerizer/mesos/containerizer.cpp |  14 +-
 src/slave/state.cpp                             |   7 -
 src/slave/state.hpp                             |   5 -
 src/tests/disk_quota_tests.cpp                  | 161 +++++++++++++++++--
 4 files changed, 165 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index ec4626f..fbd1c0a 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -354,10 +354,22 @@ Future<Nothing> MesosContainerizerProcess::recover(
                   << "' for executor '" << executor.id
                   << "' of framework " << framework.id;
 
+        // NOTE: We create the executor directory before checkpointing
+        // the executor. Therefore, it's not possible for this
+        // directory to be non-existent.
+        const string& directory = paths::getExecutorRunPath(
+            flags.work_dir,
+            state.get().id,
+            framework.id,
+            executor.id,
+            containerId);
+
+        CHECK(os::exists(directory));
+
         ExecutorRunState executorRunState(
             run.get().id.get(),
             run.get().forkedPid.get(),
-            run.get().directory);
+            directory);
 
         recoverable.push_back(executorRunState);
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 41f6c2c..35ce70b 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -399,13 +399,6 @@ Try<RunState> RunState::recover(
   state.id = containerId;
   string message;
 
-  state.directory = paths::getExecutorRunPath(
-      rootDir,
-      slaveId,
-      frameworkId,
-      executorId,
-      containerId);
-
   // See if the sentinel file exists. This is done first so it is
   // known even if partial state is returned, e.g., if the libprocess
   // pid file is not recovered. It indicates the slave removed the

http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 0201e72..31dfdd5 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -265,11 +265,6 @@ struct RunState
   Option<pid_t> forkedPid;
   Option<process::UPID> libprocessPid;
 
-  // NOTE: We create the executor directory before checkpointing the
-  // executor. Therefore, it's not possible for this directory to be
-  // non-existent.
-  std::string directory;
-
   // Executor terminated and all its updates acknowledged.
   bool completed;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ceb13692/src/tests/disk_quota_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/disk_quota_tests.cpp b/src/tests/disk_quota_tests.cpp
index 56d82ee..9c3a881 100644
--- a/src/tests/disk_quota_tests.cpp
+++ b/src/tests/disk_quota_tests.cpp
@@ -34,6 +34,7 @@
 
 #include "master/master.hpp"
 
+#include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/slave.hpp"
 
@@ -296,9 +297,7 @@ TEST_F(DiskQuotaTest, NoQuotaEnforcement)
       break;
     }
 
-    if (elapsed > Seconds(5)) {
-      FAIL() << "Failed to wait for process to be traced";
-    }
+    ASSERT_LT(elapsed, Seconds(5));
 
     os::sleep(Milliseconds(1));
     elapsed += Milliseconds(1);
@@ -377,14 +376,158 @@ TEST_F(DiskQuotaTest, ResourceStatistics)
 
   ContainerID containerId = *(containers.get().begin());
 
-  Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
-  AWAIT_READY(usage);
+  // Wait until disk usage can be retrieved.
+  Duration elapsed = Duration::zero();
+  while (true) {
+    Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+    AWAIT_READY(usage);
+
+    ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+    EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+    if (usage.get().has_disk_used_bytes()) {
+      EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+      break;
+    }
+
+    ASSERT_LT(elapsed, Seconds(5));
+
+    os::sleep(Milliseconds(1));
+    elapsed += Milliseconds(1);
+  }
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  delete containerizer.get();
+}
+
+
+// This test verifies that disk quota isolator recovers properly after
+// the slave restarts.
+TEST_F(DiskQuotaTest, SlaveRecovery)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+  flags.isolation = "posix/cpu,posix/mem,posix/disk";
+  flags.container_disk_watch_interval = Milliseconds(1);
+
+  Fetcher fetcher;
+
+  Try<MesosContainerizer*> containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(containerizer);
+
+  Try<PID<Slave>> slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());      // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  // Create a task that uses 2MB disk.
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      Resources::parse("cpus:1;mem:128;disk:3").get(),
+      "dd if=/dev/zero of=file bs=1048576 count=2 && sleep 1000");
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(task.task_id(), status.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<hashset<ContainerID>> containers = containerizer.get()->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(1u, containers.get().size());
+
+  ContainerID containerId = *(containers.get().begin());
+
+  // Stop the slave.
+  Stop(slave.get());
+  delete containerizer.get();
+
+  Future<Nothing> _recover = FUTURE_DISPATCH(_, &Slave::_recover);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  containerizer = MesosContainerizer::create(flags, true, &fetcher);
+  ASSERT_SOME(containerizer);
+
+  slave = StartSlave(containerizer.get(), flags);
+  ASSERT_SOME(slave);
+
+  Clock::pause();
+
+  AWAIT_READY(_recover);
+
+  // Wait for slave to schedule reregister timeout.
+  Clock::settle();
+
+  // Ensure the slave considers itself recovered.
+  Clock::advance(slave::EXECUTOR_REREGISTER_TIMEOUT);
+
+  // NOTE: We resume the clock because we need the reaper to reap the
+  // 'du' subprocess.
+  Clock::resume();
 
-  ASSERT_TRUE(usage.get().has_disk_limit_bytes());
-  EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+  // Wait for the slave to re-register.
+  AWAIT_READY(slaveReregisteredMessage);
 
-  if (usage.get().has_disk_used_bytes()) {
-    EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+  // Wait until disk usage can be retrieved.
+  Duration elapsed = Duration::zero();
+  while (true) {
+    Future<ResourceStatistics> usage = containerizer.get()->usage(containerId);
+    AWAIT_READY(usage);
+
+    ASSERT_TRUE(usage.get().has_disk_limit_bytes());
+    EXPECT_EQ(Megabytes(3), Bytes(usage.get().disk_limit_bytes()));
+
+    if (usage.get().has_disk_used_bytes()) {
+      EXPECT_LE(usage.get().disk_used_bytes(), usage.get().disk_limit_bytes());
+
+      // NOTE: This is to capture the regression in MESOS-2452. The data
+      // stored in the executor meta directory should be less than 64K.
+      EXPECT_GT(usage.get().disk_used_bytes(), Kilobytes(64).bytes());
+      break;
+    }
+
+    ASSERT_LT(elapsed, Seconds(5));
+
+    os::sleep(Milliseconds(1));
+    elapsed += Milliseconds(1);
   }
 
   driver.stop();