You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/26 01:38:16 UTC

[1/2] mesos git commit: Allowed Mesos containerizer to prepare and update volumes.

Repository: mesos
Updated Branches:
  refs/heads/master 727bf1f8c -> 795a615c7


Allowed Mesos containerizer to prepare and update volumes.

Review: https://reviews.apache.org/r/30510


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/795a615c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/795a615c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/795a615c

Branch: refs/heads/master
Commit: 795a615c796d9e051a0b0794e5ec24e5d39b1cdf
Parents: 8fbc524
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jan 30 16:38:53 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 25 16:36:23 2015 -0800

----------------------------------------------------------------------
 src/slave/containerizer/mesos/containerizer.cpp | 141 +++++++++++++++++--
 src/slave/containerizer/mesos/containerizer.hpp |   6 +
 src/tests/persistent_volume_tests.cpp           | 119 ++++++++++++++++
 3 files changed, 257 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index bb8564a..ec4626f 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -27,7 +27,9 @@
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
 
+#include <stout/fs.hpp>
 #include <stout/os.hpp>
+#include <stout/path.hpp>
 
 #include "module/manager.hpp"
 
@@ -452,15 +454,30 @@ Future<bool> MesosContainerizerProcess::launch(
     return false;
   }
 
+  LOG(INFO) << "Starting container '" << containerId
+            << "' for executor '" << executorInfo.executor_id()
+            << "' of framework '" << executorInfo.framework_id() << "'";
+
   Container* container = new Container();
-  container->resources = executorInfo.resources();
   container->directory = directory;
   container->state = PREPARING;
   containers_[containerId] = Owned<Container>(container);
 
-  LOG(INFO) << "Starting container '" << containerId
-            << "' for executor '" << executorInfo.executor_id()
-            << "' of framework '" << executorInfo.framework_id() << "'";
+  // Prepare volumes for the container.
+  // TODO(jieyu): Consider decoupling file system isolation from
+  // runtime isolation. The existing isolators are actually for
+  // runtime isolation. For file system isolation, the interface might
+  // be different and we always need a file system isolator. The
+  // following logic should be moved to the file system isolator.
+  Try<Nothing> update = updateVolumes(containerId, executorInfo.resources());
+  if (update.isError()) {
+    return Failure("Failed to prepare volumes: " + update.error());
+  }
+
+  // NOTE: We do not update 'container->resources' until volumes are
+  // prepared because 'updateVolumes' above depends on the current
+  // container resources.
+  container->resources = executorInfo.resources();
 
   return prepare(containerId, executorInfo, directory, user)
     .then(defer(self(),
@@ -797,7 +814,7 @@ Future<containerizer::Termination> MesosContainerizerProcess::wait(
 
 Future<Nothing> MesosContainerizerProcess::update(
     const ContainerID& containerId,
-    const Resources& _resources)
+    const Resources& resources)
 {
   if (!containers_.contains(containerId)) {
     // It is not considered a failure if the container is not known
@@ -808,19 +825,29 @@ Future<Nothing> MesosContainerizerProcess::update(
     return Nothing();
   }
 
-  if (containers_[containerId]->state == DESTROYING) {
+  const Owned<Container>& container = containers_[containerId];
+
+  if (container->state == DESTROYING) {
     LOG(WARNING) << "Ignoring update for currently being destroyed container: "
                  << containerId;
     return Nothing();
   }
 
-  // Store the resources for usage().
-  containers_[containerId]->resources = _resources;
+  // Update volumes for the container.
+  // TODO(jieyu): See comments above 'updateVolumes' in 'launch'.
+  Try<Nothing> update = updateVolumes(containerId, resources);
+  if (update.isError()) {
+    return Failure("Failed to update volumes: " + update.error());
+  }
+
+  // NOTE: We update container's resources before isolators are updated
+  // so that subsequent containerizer->update can be handled properly.
+  container->resources = resources;
 
   // Update each isolator.
   list<Future<Nothing>> futures;
   foreach (const Owned<Isolator>& isolator, isolators) {
-    futures.push_back(isolator->update(containerId, _resources));
+    futures.push_back(isolator->update(containerId, resources));
   }
 
   // Wait for all isolators to complete.
@@ -1172,6 +1199,102 @@ MesosContainerizerProcess::Metrics::~Metrics()
 }
 
 
+Try<Nothing> MesosContainerizerProcess::updateVolumes(
+    const ContainerID& containerId,
+    const Resources& updated)
+{
+  CHECK(containers_.contains(containerId));
+  const Owned<Container>& container = containers_[containerId];
+
+  // TODO(jieyu): Currently, we only allow non-nested relative
+  // container paths for volumes. This is enforced by the master. For
+  // those volumes, we create symlinks in the executor directory. No
+  // need to proceed if the container change the file system root
+  // because the symlinks will become invalid if the file system root
+  // is changed. Consider moving this logic to the file system
+  // isolator and let the file system isolator decide how to mount
+  // those volumes (by either creating symlinks or doing bind mounts).
+  if (container->rootfs.isSome()) {
+    LOG(WARNING) << "Cannot update volumes for container " << containerId
+                 << " because it changes the file system root";
+    return Nothing();
+  }
+
+  Resources current = container->resources;
+
+  // We first remove unneeded persistent volumes.
+  foreach (const Resource& resource, current.persistentVolumes()) {
+    // This is enforced by the master.
+    CHECK(resource.disk().has_volume());
+
+    // Ignore absolute and nested paths.
+    const string& containerPath = resource.disk().volume().container_path();
+    if (strings::contains(containerPath, "/")) {
+      LOG(WARNING) << "Skipping updating symlink for persistent volume "
+                   << resource << " of container " << containerId
+                   << " because the container path '" << containerPath
+                   << "' contains slash";
+      continue;
+    }
+
+    if (updated.contains(resource)) {
+      continue;
+    }
+
+    string link = path::join(container->directory, containerPath);
+
+    LOG(INFO) << "Removing symlink '" << link << "' for persistent volume "
+              << resource << " of container " << containerId;
+
+    Try<Nothing> rm = os::rm(link);
+    if (rm.isError()) {
+      return Error(
+          "Failed to remove the symlink for the unneeded "
+          "persistent volume at '" + link + "'");
+    }
+  }
+
+  // We then link additional persistent volumes.
+  foreach (const Resource& resource, updated.persistentVolumes()) {
+    // This is enforced by the master.
+    CHECK(resource.disk().has_volume());
+
+    // Ignore absolute and nested paths.
+    const string& containerPath = resource.disk().volume().container_path();
+    if (strings::contains(containerPath, "/")) {
+      LOG(WARNING) << "Skipping updating symlink for persistent volume "
+                   << resource << " of container " << containerId
+                   << " because the container path '" << containerPath
+                   << "' contains slash";
+      continue;
+    }
+
+    if (current.contains(resource)) {
+      continue;
+    }
+
+    string link = path::join(container->directory, containerPath);
+
+    string original = paths::getPersistentVolumePath(
+        flags.work_dir,
+        resource.role(),
+        resource.disk().persistence().id());
+
+    LOG(INFO) << "Adding symlink from '" << original << "' to '"
+              << link << "' for persistent volume " << resource
+              << " of container " << containerId;
+
+    Try<Nothing> symlink = fs::symlink(original, link);
+    if (symlink.isError()) {
+      return Error(
+          "Failed to symlink persistent volume from '" +
+          original + "' to '" + link + "'");
+    }
+  }
+
+  return Nothing();
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 79a87c7..ae61a0f 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -228,6 +228,12 @@ private:
   // destroy.
   void reaped(const ContainerID& containerId);
 
+  // Updates volumes for the given container according to its current
+  // resources and the given updated resources.
+  Try<Nothing> updateVolumes(
+      const ContainerID& containerId,
+      const Resources& updated);
+
   const Flags flags;
   const bool local;
   Fetcher* fetcher;

http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index a7d2900..7ab5646 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -32,6 +32,7 @@
 #include <stout/foreach.hpp>
 #include <stout/format.hpp>
 #include <stout/hashset.hpp>
+#include <stout/path.hpp>
 #include <stout/strings.hpp>
 
 #include <stout/os/exists.hpp>
@@ -119,6 +120,18 @@ protected:
     operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
     return operation;
   }
+
+  Offer::Operation LaunchOperation(const vector<TaskInfo>& tasks)
+  {
+    Offer::Operation operation;
+    operation.set_type(Offer::Operation::LAUNCH);
+
+    foreach (const TaskInfo& task, tasks) {
+      operation.mutable_launch()->add_task_infos()->CopyFrom(task);
+    }
+
+    return operation;
+  }
 };
 
 
@@ -446,6 +459,7 @@ TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
   ASSERT_SOME(master);
 
   slave::Flags slaveFlags = CreateSlaveFlags();
+
   slaveFlags.checkpoint = true;
   slaveFlags.resources = "disk(role1):1024";
 
@@ -521,6 +535,111 @@ TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
   Shutdown();
 }
 
+
+// This test verifies that a persistent volume is correctly linked by
+// the containerizer and the task is able to access it according to
+// the container path it specifies.
+TEST_F(PersistentVolumeTest, AccessPersistentVolume)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+
+  Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  slaveFlags.resources = "cpus:2;mem:1024;disk(role1):1024";
+
+  Try<PID<Slave>> slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  Resources volume = PersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1");
+
+  // Create a task which writes a file in the persistent volume.
+  Resources taskResources =
+    Resources::parse("cpus:1;mem:128;disk(role1):32").get() + volume;
+
+  TaskInfo task = createTask(
+      offer.slave_id(),
+      taskResources,
+      "echo -n abc > path1/file");
+
+  Future<TaskStatus> status1;
+  Future<TaskStatus> status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CreateOperation(volume),
+       LaunchOperation({task})});
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(task.task_id(), status1.get().task_id());
+  EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(task.task_id(), status2.get().task_id());
+  EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+  // This is to verify that the persistent volume is correctly
+  // unlinked from the executor working directory after TASK_FINISHED
+  // is received by the scheduler (at which time the container's
+  // resources should already be updated).
+
+  // NOTE: The command executor's id is the same as the task id.
+  ExecutorID executorId;
+  executorId.set_value(task.task_id().value());
+
+  const string& directory = slave::paths::getExecutorLatestRunPath(
+      slaveFlags.work_dir,
+      offer.slave_id(),
+      frameworkId.get(),
+      executorId);
+
+  EXPECT_FALSE(os::exists(path::join(directory, "path1")));
+
+  const string& volumePath = slave::paths::getPersistentVolumePath(
+      slaveFlags.work_dir,
+      "role1",
+      "id1");
+
+  EXPECT_SOME_EQ("abc", os::read(path::join(volumePath, "file")));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[2/2] mesos git commit: Added validation for checkpointed resources during slave recovery.

Posted by ji...@apache.org.
Added validation for checkpointed resources during slave recovery.

Review: https://reviews.apache.org/r/30850


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8fbc5243
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8fbc5243
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8fbc5243

Branch: refs/heads/master
Commit: 8fbc524304d705aba42a3723dced2b80d3720622
Parents: 727bf1f
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Feb 11 10:33:03 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 25 16:36:23 2015 -0800

----------------------------------------------------------------------
 src/Makefile.am                       |  2 +
 src/common/resources_utils.cpp        | 67 +++++++++++++++++++++
 src/common/resources_utils.hpp        | 45 ++++++++++++++
 src/master/master.hpp                 | 39 +++----------
 src/slave/slave.cpp                   | 19 ++++++
 src/slave/slave.hpp                   |  3 +-
 src/slave/state.cpp                   |  2 +
 src/tests/mesos.cpp                   | 24 +++++---
 src/tests/mesos.hpp                   |  6 ++
 src/tests/persistent_volume_tests.cpp | 94 +++++++++++++++++++++++++++++-
 10 files changed, 261 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index f9e6975..17d0d7a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -296,6 +296,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	common/lock.cpp							\
 	common/protobuf_utils.cpp					\
 	common/resources.cpp						\
+	common/resources_utils.cpp					\
 	common/thread.cpp						\
 	common/type_utils.cpp						\
 	common/values.cpp						\
@@ -483,6 +484,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	common/lock.hpp							\
 	common/parse.hpp						\
 	common/protobuf_utils.hpp					\
+	common/resources_utils.hpp					\
 	common/status_utils.hpp						\
 	common/thread.hpp						\
 	credentials/credentials.hpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
new file mode 100644
index 0000000..fe04d57
--- /dev/null
+++ b/src/common/resources_utils.cpp
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/resources_utils.hpp"
+
+namespace mesos {
+
+bool needCheckpointing(const Resource& resource)
+{
+  // TODO(mpark): Consider framework reservations.
+  return Resources::isPersistentVolume(resource);
+}
+
+
+Try<Resources> applyCheckpointedResources(
+    const Resources& resources,
+    const Resources& checkpointedResources)
+{
+  Resources totalResources = resources;
+
+  foreach (const Resource& resource, checkpointedResources) {
+    if (!needCheckpointing(resource)) {
+      return Error("Unexpected checkpointed resources " + stringify(resource));
+    }
+
+    // TODO(jieyu): Apply RESERVE operation if 'resource' is
+    // dynamically reserved.
+
+    if (Resources::isPersistentVolume(resource)) {
+      Offer::Operation create;
+      create.set_type(Offer::Operation::CREATE);
+      create.mutable_create()->add_volumes()->CopyFrom(resource);
+
+      Try<Resources> applied = totalResources.apply(create);
+      if (applied.isError()) {
+        return Error(
+            "Cannot find transition for checkpointed resource " +
+            stringify(resource));
+      }
+
+      totalResources = applied.get();
+    }
+  }
+
+  return totalResources;
+}
+
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
new file mode 100644
index 0000000..3758ad5
--- /dev/null
+++ b/src/common/resources_utils.hpp
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RESOURCES_UTILS_HPP__
+#define __RESOURCES_UTILS_HPP__
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <stout/try.hpp>
+
+namespace mesos {
+
+// Tests if the given Resource needs to be checkpointed on the slave.
+// NOTE: We assume the given resource is validated.
+bool needCheckpointing(const Resource& resource);
+
+// Returns the total resources by applying the given checkpointed
+// resources to the given resources. This function is useful when we
+// want to calculate the total resources of a slave from the resources
+// specified from the command line and the checkpointed resources.
+// Returns error if the given resources are not compatible with the
+// given checkpointed resources.
+Try<Resources> applyCheckpointedResources(
+    const Resources& resources,
+    const Resources& checkpointedResources);
+
+} // namespace mesos {
+
+#endif // __RESOURCES_UTILS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d414061..e288cdb 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -49,6 +49,7 @@
 #include <stout/option.hpp>
 
 #include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
 
 #include "files/files.hpp"
 
@@ -759,31 +760,18 @@ struct Slave
       registeredTime(_registeredTime),
       connected(true),
       active(true),
-      totalResources(_info.resources()),
+      checkpointedResources(_checkpointedResources),
       observer(NULL)
   {
     CHECK(_info.has_id());
 
-    // We here infer offer operations from the given checkpointed
-    // resources and update total/checkpointed resources of this slave
-    // by calling 'apply(operation)'.
-    foreach (const Resource& resource, _checkpointedResources) {
-      // TODO(jieyu): Apply RESERVE operation if 'resource' is
-      // dynamically reserved.
-
-      if (Resources::isPersistentVolume(resource)) {
-        Offer::Operation create;
-        create.set_type(Offer::Operation::CREATE);
-        create.mutable_create()->add_volumes()->CopyFrom(resource);
-
-        // NOTE: Here, we assume master has already validated slave's
-        // checkpointed resources so that 'apply' will always succeed.
-        apply(create);
-      } else {
-        LOG(FATAL) << "Not expecting checkpointed resource "
-                   << resource << " from slave " << id;
-      }
-    }
+    Try<Resources> resources = applyCheckpointedResources(
+        info.resources(),
+        _checkpointedResources);
+
+    // NOTE: This should be validated during slave recovery.
+    CHECK_SOME(resources);
+    totalResources = resources.get();
 
     foreach (const ExecutorInfo& executorInfo, executorInfos) {
       CHECK(executorInfo.has_framework_id());
@@ -923,7 +911,6 @@ struct Slave
     CHECK_SOME(resources);
 
     totalResources = resources.get();
-
     checkpointedResources = totalResources.filter(needCheckpointing);
   }
 
@@ -983,14 +970,6 @@ struct Slave
 private:
   Slave(const Slave&);              // No copying.
   Slave& operator = (const Slave&); // No assigning.
-
-  // Returns true iff the resource needs to be checkpointed on the slave.
-  // TODO(mpark): Consider framework reservations.
-  // TODO(mpark): Factor this out to somewhere the slave can use it.
-  static bool needCheckpointing(const Resource& resource)
-  {
-    return Resources::isPersistentVolume(resource);
-  }
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e52ff5a..9f31fa4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -68,6 +68,7 @@
 
 #include "common/build.hpp"
 #include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
 #include "common/status_utils.hpp"
 
 #include "credentials/credentials.hpp"
@@ -3641,6 +3642,8 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
   }
 
   // Recover checkpointed resources.
+  // NOTE: 'resourcesState' is None if the slave rootDir does not
+  // exist or the resources checkpoint file cannot be found.
   if (resourcesState.isSome()) {
     if (resourcesState.get().errors > 0) {
       LOG(WARNING) << "Errors encountered during resources recovery: "
@@ -3649,6 +3652,22 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
       metrics.recovery_errors += resourcesState.get().errors;
     }
 
+    // This is to verify that the checkpointed resources are
+    // compatible with the slave resources specified through the
+    // '--resources' command line flag.
+    Try<Resources> totalResources = applyCheckpointedResources(
+        info.resources(),
+        resourcesState.get().resources);
+
+    if (totalResources.isError()) {
+      return Failure(
+          "Checkpointed resources " +
+          stringify(resourcesState.get().resources) +
+          " are incompatible with slave resources " +
+          stringify(info.resources()) + ": " +
+          totalResources.error());
+    }
+
     checkpointedResources = resourcesState.get().resources;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index f3eab7e..7b58cad 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -336,7 +336,8 @@ public:
       const Option<state::SlaveState>& state);
 
   // This is called when recovery finishes.
-  void __recover(const process::Future<Nothing>& future);
+  // Made 'virtual' for Slave mocking.
+  virtual void __recover(const process::Future<Nothing>& future);
 
   // Helper to recover a framework from the specified state.
   void recoverFramework(const state::FrameworkState& state);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 0329ba5..41f6c2c 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -50,6 +50,8 @@ Result<State> recover(const string& rootDir, bool strict)
     return Error(resources.error());
   }
 
+  // TODO(jieyu): Do not set 'state.resources' if we cannot find the
+  // resources checkpoint file.
   state.resources = resources.get();
 
   // Did the machine reboot? No need to recover slave state if the

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 851aaa2..23f790c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -358,14 +358,16 @@ MockSlave::MockSlave(const slave::Flags& flags,
       statusUpdateManager = new slave::StatusUpdateManager(flags))
 {
   // Set up default behaviors, calling the original methods.
-  EXPECT_CALL(*this, runTask(_, _, _, _, _)).
-      WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _runTask(_, _, _, _, _)).
-      WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
-  EXPECT_CALL(*this, killTask(_, _, _)).
-      WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
-  EXPECT_CALL(*this, removeFramework(_)).
-      WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+  EXPECT_CALL(*this, runTask(_, _, _, _, _))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
+  EXPECT_CALL(*this, _runTask(_, _, _, _, _))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
+  EXPECT_CALL(*this, killTask(_, _, _))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
+  EXPECT_CALL(*this, removeFramework(_))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+  EXPECT_CALL(*this, __recover(_))
+    .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
 }
 
 
@@ -412,6 +414,12 @@ void MockSlave::unmocked_removeFramework(slave::Framework* framework)
 }
 
 
+void MockSlave::unmocked___recover(const Future<Nothing>& future)
+{
+  slave::Slave::__recover(future);
+}
+
+
 slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
 {
   slave::Flags flags = MesosTest::CreateSlaveFlags();

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index abf45c7..f7a0d05 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -659,6 +659,12 @@ public:
   void unmocked_removeFramework(
       slave::Framework* framework);
 
+  MOCK_METHOD1(__recover, void(
+      const process::Future<Nothing>& future));
+
+  void unmocked___recover(
+      const process::Future<Nothing>& future);
+
 private:
   Files files;
   MockGarbageCollector gc;

http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 2b67370..a7d2900 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -43,6 +43,7 @@
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 
 using namespace process;
@@ -54,11 +55,14 @@ using mesos::internal::slave::Slave;
 using std::string;
 using std::vector;
 
+using testing::_;
+using testing::Return;
+using testing::DoAll;
+
 namespace mesos {
 namespace internal {
 namespace tests {
 
-
 class PersistentVolumeTest : public MesosTest
 {
 protected:
@@ -429,6 +433,94 @@ TEST_F(PersistentVolumeTest, MasterFailover)
   Shutdown();
 }
 
+
+// This test verifies that a slave will refuse to start if the
+// checkpointed resources it recovers are not compatible with the
+// slave resources specified using the '--resources' flag.
+TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+
+  Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.checkpoint = true;
+  slaveFlags.resources = "disk(role1):1024";
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave1(slaveFlags, &detector, &containerizer);
+  spawn(slave1);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  Resources volume = PersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1");
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CreateOperation(volume)});
+
+  AWAIT_READY(checkpointResources);
+
+  terminate(slave1);
+  wait(slave1);
+
+  // Simulate a reboot of the slave machine by modify the boot ID.
+  ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+      slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+      "rebooted! ;)"));
+
+  // Change the slave resources so that it's not compatible with the
+  // checkpointed resources.
+  slaveFlags.resources = "disk:1024";
+
+  MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+  Future<Future<Nothing>> recover;
+  EXPECT_CALL(slave2, __recover(_))
+    .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+  spawn(slave2);
+
+  AWAIT_READY(recover);
+  AWAIT_FAILED(recover.get());
+
+  terminate(slave2);
+  wait(slave2);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {