You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/26 01:38:16 UTC
[1/2] mesos git commit: Allowed Mesos containerizer to prepare and
update volumes.
Repository: mesos
Updated Branches:
refs/heads/master 727bf1f8c -> 795a615c7
Allowed Mesos containerizer to prepare and update volumes.
Review: https://reviews.apache.org/r/30510
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/795a615c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/795a615c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/795a615c
Branch: refs/heads/master
Commit: 795a615c796d9e051a0b0794e5ec24e5d39b1cdf
Parents: 8fbc524
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Jan 30 16:38:53 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 25 16:36:23 2015 -0800
----------------------------------------------------------------------
src/slave/containerizer/mesos/containerizer.cpp | 141 +++++++++++++++++--
src/slave/containerizer/mesos/containerizer.hpp | 6 +
src/tests/persistent_volume_tests.cpp | 119 ++++++++++++++++
3 files changed, 257 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/slave/containerizer/mesos/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp
index bb8564a..ec4626f 100644
--- a/src/slave/containerizer/mesos/containerizer.cpp
+++ b/src/slave/containerizer/mesos/containerizer.cpp
@@ -27,7 +27,9 @@
#include <process/reap.hpp>
#include <process/subprocess.hpp>
+#include <stout/fs.hpp>
#include <stout/os.hpp>
+#include <stout/path.hpp>
#include "module/manager.hpp"
@@ -452,15 +454,30 @@ Future<bool> MesosContainerizerProcess::launch(
return false;
}
+ LOG(INFO) << "Starting container '" << containerId
+ << "' for executor '" << executorInfo.executor_id()
+ << "' of framework '" << executorInfo.framework_id() << "'";
+
Container* container = new Container();
- container->resources = executorInfo.resources();
container->directory = directory;
container->state = PREPARING;
containers_[containerId] = Owned<Container>(container);
- LOG(INFO) << "Starting container '" << containerId
- << "' for executor '" << executorInfo.executor_id()
- << "' of framework '" << executorInfo.framework_id() << "'";
+ // Prepare volumes for the container.
+ // TODO(jieyu): Consider decoupling file system isolation from
+ // runtime isolation. The existing isolators are actually for
+ // runtime isolation. For file system isolation, the interface might
+ // be different and we always need a file system isolator. The
+ // following logic should be moved to the file system isolator.
+ Try<Nothing> update = updateVolumes(containerId, executorInfo.resources());
+ if (update.isError()) {
+ return Failure("Failed to prepare volumes: " + update.error());
+ }
+
+ // NOTE: We do not update 'container->resources' until volumes are
+ // prepared because 'updateVolumes' above depends on the current
+ // container resources.
+ container->resources = executorInfo.resources();
return prepare(containerId, executorInfo, directory, user)
.then(defer(self(),
@@ -797,7 +814,7 @@ Future<containerizer::Termination> MesosContainerizerProcess::wait(
Future<Nothing> MesosContainerizerProcess::update(
const ContainerID& containerId,
- const Resources& _resources)
+ const Resources& resources)
{
if (!containers_.contains(containerId)) {
// It is not considered a failure if the container is not known
@@ -808,19 +825,29 @@ Future<Nothing> MesosContainerizerProcess::update(
return Nothing();
}
- if (containers_[containerId]->state == DESTROYING) {
+ const Owned<Container>& container = containers_[containerId];
+
+ if (container->state == DESTROYING) {
LOG(WARNING) << "Ignoring update for currently being destroyed container: "
<< containerId;
return Nothing();
}
- // Store the resources for usage().
- containers_[containerId]->resources = _resources;
+ // Update volumes for the container.
+ // TODO(jieyu): See comments above 'updateVolumes' in 'launch'.
+ Try<Nothing> update = updateVolumes(containerId, resources);
+ if (update.isError()) {
+ return Failure("Failed to update volumes: " + update.error());
+ }
+
+ // NOTE: We update container's resources before isolators are updated
+ // so that subsequent containerizer->update can be handled properly.
+ container->resources = resources;
// Update each isolator.
list<Future<Nothing>> futures;
foreach (const Owned<Isolator>& isolator, isolators) {
- futures.push_back(isolator->update(containerId, _resources));
+ futures.push_back(isolator->update(containerId, resources));
}
// Wait for all isolators to complete.
@@ -1172,6 +1199,102 @@ MesosContainerizerProcess::Metrics::~Metrics()
}
+Try<Nothing> MesosContainerizerProcess::updateVolumes(
+ const ContainerID& containerId,
+ const Resources& updated)
+{
+ CHECK(containers_.contains(containerId));
+ const Owned<Container>& container = containers_[containerId];
+
+ // TODO(jieyu): Currently, we only allow non-nested relative
+ // container paths for volumes. This is enforced by the master. For
+ // those volumes, we create symlinks in the executor directory. No
+ // need to proceed if the container change the file system root
+ // because the symlinks will become invalid if the file system root
+ // is changed. Consider moving this logic to the file system
+ // isolator and let the file system isolator decide how to mount
+ // those volumes (by either creating symlinks or doing bind mounts).
+ if (container->rootfs.isSome()) {
+ LOG(WARNING) << "Cannot update volumes for container " << containerId
+ << " because it changes the file system root";
+ return Nothing();
+ }
+
+ Resources current = container->resources;
+
+ // We first remove unneeded persistent volumes.
+ foreach (const Resource& resource, current.persistentVolumes()) {
+ // This is enforced by the master.
+ CHECK(resource.disk().has_volume());
+
+ // Ignore absolute and nested paths.
+ const string& containerPath = resource.disk().volume().container_path();
+ if (strings::contains(containerPath, "/")) {
+ LOG(WARNING) << "Skipping updating symlink for persistent volume "
+ << resource << " of container " << containerId
+ << " because the container path '" << containerPath
+ << "' contains slash";
+ continue;
+ }
+
+ if (updated.contains(resource)) {
+ continue;
+ }
+
+ string link = path::join(container->directory, containerPath);
+
+ LOG(INFO) << "Removing symlink '" << link << "' for persistent volume "
+ << resource << " of container " << containerId;
+
+ Try<Nothing> rm = os::rm(link);
+ if (rm.isError()) {
+ return Error(
+ "Failed to remove the symlink for the unneeded "
+ "persistent volume at '" + link + "'");
+ }
+ }
+
+ // We then link additional persistent volumes.
+ foreach (const Resource& resource, updated.persistentVolumes()) {
+ // This is enforced by the master.
+ CHECK(resource.disk().has_volume());
+
+ // Ignore absolute and nested paths.
+ const string& containerPath = resource.disk().volume().container_path();
+ if (strings::contains(containerPath, "/")) {
+ LOG(WARNING) << "Skipping updating symlink for persistent volume "
+ << resource << " of container " << containerId
+ << " because the container path '" << containerPath
+ << "' contains slash";
+ continue;
+ }
+
+ if (current.contains(resource)) {
+ continue;
+ }
+
+ string link = path::join(container->directory, containerPath);
+
+ string original = paths::getPersistentVolumePath(
+ flags.work_dir,
+ resource.role(),
+ resource.disk().persistence().id());
+
+ LOG(INFO) << "Adding symlink from '" << original << "' to '"
+ << link << "' for persistent volume " << resource
+ << " of container " << containerId;
+
+ Try<Nothing> symlink = fs::symlink(original, link);
+ if (symlink.isError()) {
+ return Error(
+ "Failed to symlink persistent volume from '" +
+ original + "' to '" + link + "'");
+ }
+ }
+
+ return Nothing();
+}
+
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/slave/containerizer/mesos/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp
index 79a87c7..ae61a0f 100644
--- a/src/slave/containerizer/mesos/containerizer.hpp
+++ b/src/slave/containerizer/mesos/containerizer.hpp
@@ -228,6 +228,12 @@ private:
// destroy.
void reaped(const ContainerID& containerId);
+ // Updates volumes for the given container according to its current
+ // resources and the given updated resources.
+ Try<Nothing> updateVolumes(
+ const ContainerID& containerId,
+ const Resources& updated);
+
const Flags flags;
const bool local;
Fetcher* fetcher;
http://git-wip-us.apache.org/repos/asf/mesos/blob/795a615c/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index a7d2900..7ab5646 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -32,6 +32,7 @@
#include <stout/foreach.hpp>
#include <stout/format.hpp>
#include <stout/hashset.hpp>
+#include <stout/path.hpp>
#include <stout/strings.hpp>
#include <stout/os/exists.hpp>
@@ -119,6 +120,18 @@ protected:
operation.mutable_destroy()->mutable_volumes()->CopyFrom(volumes);
return operation;
}
+
+ Offer::Operation LaunchOperation(const vector<TaskInfo>& tasks)
+ {
+ Offer::Operation operation;
+ operation.set_type(Offer::Operation::LAUNCH);
+
+ foreach (const TaskInfo& task, tasks) {
+ operation.mutable_launch()->add_task_infos()->CopyFrom(task);
+ }
+
+ return operation;
+ }
};
@@ -446,6 +459,7 @@ TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
ASSERT_SOME(master);
slave::Flags slaveFlags = CreateSlaveFlags();
+
slaveFlags.checkpoint = true;
slaveFlags.resources = "disk(role1):1024";
@@ -521,6 +535,111 @@ TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
Shutdown();
}
+
+// This test verifies that a persistent volume is correctly linked by
+// the containerizer and the task is able to access it according to
+// the container path it specifies.
+TEST_F(PersistentVolumeTest, AccessPersistentVolume)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+
+ Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ slaveFlags.resources = "cpus:2;mem:1024;disk(role1):1024";
+
+ Try<PID<Slave>> slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ Resources volume = PersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ // Create a task which writes a file in the persistent volume.
+ Resources taskResources =
+ Resources::parse("cpus:1;mem:128;disk(role1):32").get() + volume;
+
+ TaskInfo task = createTask(
+ offer.slave_id(),
+ taskResources,
+ "echo -n abc > path1/file");
+
+ Future<TaskStatus> status1;
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status1))
+ .WillOnce(FutureArg<1>(&status2));
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CreateOperation(volume),
+ LaunchOperation({task})});
+
+ AWAIT_READY(status1);
+ EXPECT_EQ(task.task_id(), status1.get().task_id());
+ EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(task.task_id(), status2.get().task_id());
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+ // This is to verify that the persistent volume is correctly
+ // unlinked from the executor working directory after TASK_FINISHED
+ // is received by the scheduler (at which time the container's
+ // resources should already be updated).
+
+ // NOTE: The command executor's id is the same as the task id.
+ ExecutorID executorId;
+ executorId.set_value(task.task_id().value());
+
+ const string& directory = slave::paths::getExecutorLatestRunPath(
+ slaveFlags.work_dir,
+ offer.slave_id(),
+ frameworkId.get(),
+ executorId);
+
+ EXPECT_FALSE(os::exists(path::join(directory, "path1")));
+
+ const string& volumePath = slave::paths::getPersistentVolumePath(
+ slaveFlags.work_dir,
+ "role1",
+ "id1");
+
+ EXPECT_SOME_EQ("abc", os::read(path::join(volumePath, "file")));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[2/2] mesos git commit: Added validation for checkpointed resources
during slave recovery.
Posted by ji...@apache.org.
Added validation for checkpointed resources during slave recovery.
Review: https://reviews.apache.org/r/30850
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8fbc5243
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8fbc5243
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8fbc5243
Branch: refs/heads/master
Commit: 8fbc524304d705aba42a3723dced2b80d3720622
Parents: 727bf1f
Author: Jie Yu <yu...@gmail.com>
Authored: Wed Feb 11 10:33:03 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Feb 25 16:36:23 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 2 +
src/common/resources_utils.cpp | 67 +++++++++++++++++++++
src/common/resources_utils.hpp | 45 ++++++++++++++
src/master/master.hpp | 39 +++----------
src/slave/slave.cpp | 19 ++++++
src/slave/slave.hpp | 3 +-
src/slave/state.cpp | 2 +
src/tests/mesos.cpp | 24 +++++---
src/tests/mesos.hpp | 6 ++
src/tests/persistent_volume_tests.cpp | 94 +++++++++++++++++++++++++++++-
10 files changed, 261 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index f9e6975..17d0d7a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -296,6 +296,7 @@ libmesos_no_3rdparty_la_SOURCES = \
common/lock.cpp \
common/protobuf_utils.cpp \
common/resources.cpp \
+ common/resources_utils.cpp \
common/thread.cpp \
common/type_utils.cpp \
common/values.cpp \
@@ -483,6 +484,7 @@ libmesos_no_3rdparty_la_SOURCES += \
common/lock.hpp \
common/parse.hpp \
common/protobuf_utils.hpp \
+ common/resources_utils.hpp \
common/status_utils.hpp \
common/thread.hpp \
credentials/credentials.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
new file mode 100644
index 0000000..fe04d57
--- /dev/null
+++ b/src/common/resources_utils.cpp
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/error.hpp>
+#include <stout/foreach.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/resources_utils.hpp"
+
+namespace mesos {
+
+bool needCheckpointing(const Resource& resource)
+{
+ // TODO(mpark): Consider framework reservations.
+ return Resources::isPersistentVolume(resource);
+}
+
+
+Try<Resources> applyCheckpointedResources(
+ const Resources& resources,
+ const Resources& checkpointedResources)
+{
+ Resources totalResources = resources;
+
+ foreach (const Resource& resource, checkpointedResources) {
+ if (!needCheckpointing(resource)) {
+ return Error("Unexpected checkpointed resources " + stringify(resource));
+ }
+
+ // TODO(jieyu): Apply RESERVE operation if 'resource' is
+ // dynamically reserved.
+
+ if (Resources::isPersistentVolume(resource)) {
+ Offer::Operation create;
+ create.set_type(Offer::Operation::CREATE);
+ create.mutable_create()->add_volumes()->CopyFrom(resource);
+
+ Try<Resources> applied = totalResources.apply(create);
+ if (applied.isError()) {
+ return Error(
+ "Cannot find transition for checkpointed resource " +
+ stringify(resource));
+ }
+
+ totalResources = applied.get();
+ }
+ }
+
+ return totalResources;
+}
+
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
new file mode 100644
index 0000000..3758ad5
--- /dev/null
+++ b/src/common/resources_utils.hpp
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RESOURCES_UTILS_HPP__
+#define __RESOURCES_UTILS_HPP__
+
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+
+#include <stout/try.hpp>
+
+namespace mesos {
+
+// Tests if the given Resource needs to be checkpointed on the slave.
+// NOTE: We assume the given resource is validated.
+bool needCheckpointing(const Resource& resource);
+
+// Returns the total resources by applying the given checkpointed
+// resources to the given resources. This function is useful when we
+// want to calculate the total resources of a slave from the resources
+// specified from the command line and the checkpointed resources.
+// Returns error if the given resources are not compatible with the
+// given checkpointed resources.
+Try<Resources> applyCheckpointedResources(
+ const Resources& resources,
+ const Resources& checkpointedResources);
+
+} // namespace mesos {
+
+#endif // __RESOURCES_UTILS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index d414061..e288cdb 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -49,6 +49,7 @@
#include <stout/option.hpp>
#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
#include "files/files.hpp"
@@ -759,31 +760,18 @@ struct Slave
registeredTime(_registeredTime),
connected(true),
active(true),
- totalResources(_info.resources()),
+ checkpointedResources(_checkpointedResources),
observer(NULL)
{
CHECK(_info.has_id());
- // We here infer offer operations from the given checkpointed
- // resources and update total/checkpointed resources of this slave
- // by calling 'apply(operation)'.
- foreach (const Resource& resource, _checkpointedResources) {
- // TODO(jieyu): Apply RESERVE operation if 'resource' is
- // dynamically reserved.
-
- if (Resources::isPersistentVolume(resource)) {
- Offer::Operation create;
- create.set_type(Offer::Operation::CREATE);
- create.mutable_create()->add_volumes()->CopyFrom(resource);
-
- // NOTE: Here, we assume master has already validated slave's
- // checkpointed resources so that 'apply' will always succeed.
- apply(create);
- } else {
- LOG(FATAL) << "Not expecting checkpointed resource "
- << resource << " from slave " << id;
- }
- }
+ Try<Resources> resources = applyCheckpointedResources(
+ info.resources(),
+ _checkpointedResources);
+
+ // NOTE: This should be validated during slave recovery.
+ CHECK_SOME(resources);
+ totalResources = resources.get();
foreach (const ExecutorInfo& executorInfo, executorInfos) {
CHECK(executorInfo.has_framework_id());
@@ -923,7 +911,6 @@ struct Slave
CHECK_SOME(resources);
totalResources = resources.get();
-
checkpointedResources = totalResources.filter(needCheckpointing);
}
@@ -983,14 +970,6 @@ struct Slave
private:
Slave(const Slave&); // No copying.
Slave& operator = (const Slave&); // No assigning.
-
- // Returns true iff the resource needs to be checkpointed on the slave.
- // TODO(mpark): Consider framework reservations.
- // TODO(mpark): Factor this out to somewhere the slave can use it.
- static bool needCheckpointing(const Resource& resource)
- {
- return Resources::isPersistentVolume(resource);
- }
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e52ff5a..9f31fa4 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -68,6 +68,7 @@
#include "common/build.hpp"
#include "common/protobuf_utils.hpp"
+#include "common/resources_utils.hpp"
#include "common/status_utils.hpp"
#include "credentials/credentials.hpp"
@@ -3641,6 +3642,8 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
}
// Recover checkpointed resources.
+ // NOTE: 'resourcesState' is None if the slave rootDir does not
+ // exist or the resources checkpoint file cannot be found.
if (resourcesState.isSome()) {
if (resourcesState.get().errors > 0) {
LOG(WARNING) << "Errors encountered during resources recovery: "
@@ -3649,6 +3652,22 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
metrics.recovery_errors += resourcesState.get().errors;
}
+ // This is to verify that the checkpointed resources are
+ // compatible with the slave resources specified through the
+ // '--resources' command line flag.
+ Try<Resources> totalResources = applyCheckpointedResources(
+ info.resources(),
+ resourcesState.get().resources);
+
+ if (totalResources.isError()) {
+ return Failure(
+ "Checkpointed resources " +
+ stringify(resourcesState.get().resources) +
+ " are incompatible with slave resources " +
+ stringify(info.resources()) + ": " +
+ totalResources.error());
+ }
+
checkpointedResources = resourcesState.get().resources;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index f3eab7e..7b58cad 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -336,7 +336,8 @@ public:
const Option<state::SlaveState>& state);
// This is called when recovery finishes.
- void __recover(const process::Future<Nothing>& future);
+ // Made 'virtual' for Slave mocking.
+ virtual void __recover(const process::Future<Nothing>& future);
// Helper to recover a framework from the specified state.
void recoverFramework(const state::FrameworkState& state);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/slave/state.cpp
----------------------------------------------------------------------
diff --git a/src/slave/state.cpp b/src/slave/state.cpp
index 0329ba5..41f6c2c 100644
--- a/src/slave/state.cpp
+++ b/src/slave/state.cpp
@@ -50,6 +50,8 @@ Result<State> recover(const string& rootDir, bool strict)
return Error(resources.error());
}
+ // TODO(jieyu): Do not set 'state.resources' if we cannot find the
+ // resources checkpoint file.
state.resources = resources.get();
// Did the machine reboot? No need to recover slave state if the
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index 851aaa2..23f790c 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -358,14 +358,16 @@ MockSlave::MockSlave(const slave::Flags& flags,
statusUpdateManager = new slave::StatusUpdateManager(flags))
{
// Set up default behaviors, calling the original methods.
- EXPECT_CALL(*this, runTask(_, _, _, _, _)).
- WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
- EXPECT_CALL(*this, _runTask(_, _, _, _, _)).
- WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
- EXPECT_CALL(*this, killTask(_, _, _)).
- WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
- EXPECT_CALL(*this, removeFramework(_)).
- WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+ EXPECT_CALL(*this, runTask(_, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
+ EXPECT_CALL(*this, _runTask(_, _, _, _, _))
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
+ EXPECT_CALL(*this, killTask(_, _, _))
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
+ EXPECT_CALL(*this, removeFramework(_))
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked_removeFramework));
+ EXPECT_CALL(*this, __recover(_))
+ .WillRepeatedly(Invoke(this, &MockSlave::unmocked___recover));
}
@@ -412,6 +414,12 @@ void MockSlave::unmocked_removeFramework(slave::Framework* framework)
}
+void MockSlave::unmocked___recover(const Future<Nothing>& future)
+{
+ slave::Slave::__recover(future);
+}
+
+
slave::Flags ContainerizerTest<slave::MesosContainerizer>::CreateSlaveFlags()
{
slave::Flags flags = MesosTest::CreateSlaveFlags();
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index abf45c7..f7a0d05 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -659,6 +659,12 @@ public:
void unmocked_removeFramework(
slave::Framework* framework);
+ MOCK_METHOD1(__recover, void(
+ const process::Future<Nothing>& future));
+
+ void unmocked___recover(
+ const process::Future<Nothing>& future);
+
private:
Files files;
MockGarbageCollector gc;
http://git-wip-us.apache.org/repos/asf/mesos/blob/8fbc5243/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 2b67370..a7d2900 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -43,6 +43,7 @@
#include "slave/paths.hpp"
#include "slave/slave.hpp"
+#include "tests/containerizer.hpp"
#include "tests/mesos.hpp"
using namespace process;
@@ -54,11 +55,14 @@ using mesos::internal::slave::Slave;
using std::string;
using std::vector;
+using testing::_;
+using testing::Return;
+using testing::DoAll;
+
namespace mesos {
namespace internal {
namespace tests {
-
class PersistentVolumeTest : public MesosTest
{
protected:
@@ -429,6 +433,94 @@ TEST_F(PersistentVolumeTest, MasterFailover)
Shutdown();
}
+
+// This test verifies that a slave will refuse to start if the
+// checkpointed resources it recovers are not compatible with the
+// slave resources specified using the '--resources' flag.
+TEST_F(PersistentVolumeTest, IncompatibleCheckpointedResources)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+
+ Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.checkpoint = true;
+ slaveFlags.resources = "disk(role1):1024";
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+ StandaloneMasterDetector detector(master.get());
+
+ MockSlave slave1(slaveFlags, &detector, &containerizer);
+ spawn(slave1);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ Resources volume = PersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CreateOperation(volume)});
+
+ AWAIT_READY(checkpointResources);
+
+ terminate(slave1);
+ wait(slave1);
+
+ // Simulate a reboot of the slave machine by modify the boot ID.
+ ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+ slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+ "rebooted! ;)"));
+
+ // Change the slave resources so that it's not compatible with the
+ // checkpointed resources.
+ slaveFlags.resources = "disk:1024";
+
+ MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+ Future<Future<Nothing>> recover;
+ EXPECT_CALL(slave2, __recover(_))
+ .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+ spawn(slave2);
+
+ AWAIT_READY(recover);
+ AWAIT_FAILED(recover.get());
+
+ terminate(slave2);
+ wait(slave2);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {