You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/09 20:37:02 UTC
mesos git commit: Started to maintain and checkpoint persisted
resource in slave.
Repository: mesos
Updated Branches:
refs/heads/master 6e554f6ef -> bf53ee56d
Started to maintain and checkpoint persisted resource in slave.
Review: https://reviews.apache.org/r/28809
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf53ee56
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf53ee56
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf53ee56
Branch: refs/heads/master
Commit: bf53ee56d49bd11cc2477d0f026047c6745b8417
Parents: 6e554f6
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Jan 29 10:18:35 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Feb 9 11:33:06 2015 -0800
----------------------------------------------------------------------
src/slave/slave.cpp | 91 ++++++++++++++++++++++++++++++
src/slave/slave.hpp | 5 ++
src/slave/state.hpp | 4 ++
src/tests/persistent_volume_tests.cpp | 72 ++++++++++++++++++++++-
4 files changed, 171 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fff2d72..7a29f86 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -394,6 +394,10 @@ void Slave::initialize()
&UpdateFrameworkMessage::framework_id,
&UpdateFrameworkMessage::pid);
+ install<CheckpointResourcesMessage>(
+ &Slave::checkpointResources,
+ &CheckpointResourcesMessage::resources);
+
install<StatusUpdateAcknowledgementMessage>(
&Slave::statusUpdateAcknowledgement,
&StatusUpdateAcknowledgementMessage::slave_id,
@@ -969,12 +973,18 @@ void Slave::doReliableRegistration(Duration maxBackoff)
message.set_version(MESOS_VERSION);
message.mutable_slave()->CopyFrom(info);
+ // Include checkpointed resources.
+ message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
+
send(master.get(), message);
} else {
// Re-registering, so send tasks running.
ReregisterSlaveMessage message;
message.set_version(MESOS_VERSION);
+ // Include checkpointed resources.
+ message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
+
// TODO(bmahler): Remove in 0.22.0.
message.mutable_slave_id()->CopyFrom(info.id());
message.mutable_slave()->CopyFrom(info);
@@ -1316,6 +1326,30 @@ void Slave::_runTask(
return;
}
+ // NOTE: If the task or executor uses persistent volumes, the slave
+ // should already know about it. In case the slave doesn't know
+ // about them (e.g., CheckpointResourcesMessage was dropped or came
+ // out of order), we simply fail the slave to be safe.
+ foreach (const Resource& resource, task.resources()) {
+ if (resource.has_disk() && resource.disk().has_persistence()) {
+ CHECK(checkpointedResources.contains(resource))
+ << "Unknown persistent volume " << resource
+ << " for task " << task.task_id()
+ << " of framework " << frameworkId;
+ }
+ }
+
+ if (task.has_executor()) {
+ foreach (const Resource& resource, task.executor().resources()) {
+ if (resource.has_disk() && resource.disk().has_persistence()) {
+ CHECK(checkpointedResources.contains(resource))
+ << "Unknown persistent volume " << resource
+ << " for executor " << task.executor().executor_id()
+ << " of framework " << frameworkId;
+ }
+ }
+ }
+
// NOTE: The slave cannot be in 'RECOVERING' because the task would
// have been rejected in 'runTask()' in that case.
CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
@@ -1793,6 +1827,49 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
}
+void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
+{
+ // TODO(jieyu): Here we assume that CheckpointResourcesMessages are
+ // ordered (i.e., slave receives them in the same order master sends
+ // them). This should be true in most of the cases because TCP
+ // enforces in order delivery per connection. However, the ordering
+ // is technically not guaranteed because master creates multiple
+ // connections to the slave in some cases (e.g., persistent socket
+ // to slave breaks and master uses ephemeral socket). This could
+ // potentially be solved by using a version number and rejecting
+ // stale messages according to the version number.
+ //
+ // If CheckpointResourcesMessages are delivered out-of-order, there
+ // are two cases to consider:
+ // (1) If master does not fail over, it will reconcile the state
+ // with the slave if the framework later changes the
+ // checkpointed resources. Since master is the source of truth
+ // for reservations, the inconsistency is not exposed to
+ // frameworks.
+ // (2) If master does fail over, the slave will inform the new
+ // master about the incorrect checkpointed resources. When that
+ // happens, we expect framework to reconcile based on the
+ // offers they get.
+ Resources newCheckpointedResources = _checkpointedResources;
+
+ CHECK_SOME(state::checkpoint(
+ paths::getResourcesInfoPath(metaDir),
+ newCheckpointedResources))
+ << "Failed to checkpoint resources " << newCheckpointedResources;
+
+ // TODO(jieyu): Schedule gc for released persistent volumes. We need
+ // to consider dynamic reservation here because the framework can
+ // release dynamic reservation while still wants to keep the
+ // persistent volume.
+
+ LOG(INFO) << "Updated checkpointed resources from "
+ << checkpointedResources << " to "
+ << newCheckpointedResources;
+
+ checkpointedResources = newCheckpointedResources;
+}
+
+
void Slave::statusUpdateAcknowledgement(
const UPID& from,
const SlaveID& slaveId,
@@ -3398,11 +3475,25 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
return Failure(state.error());
}
+ Option<ResourcesState> resourcesState;
Option<SlaveState> slaveState;
if (state.isSome()) {
+ resourcesState = state.get().resources;
slaveState = state.get().slave;
}
+ // Recover checkpointed resources.
+ if (resourcesState.isSome()) {
+ if (resourcesState.get().errors > 0) {
+ LOG(WARNING) << "Errors encountered during resources recovery: "
+ << resourcesState.get().errors;
+
+ metrics.recovery_errors += resourcesState.get().errors;
+ }
+
+ checkpointedResources = resourcesState.get().resources;
+ }
+
if (slaveState.isSome() && slaveState.get().info.isSome()) {
// Check for SlaveInfo compatibility.
// TODO(vinod): Also check for version compatibility.
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 9adee17..35c7c8b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -137,6 +137,8 @@ public:
void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
+ void checkpointResources(const std::vector<Resource>& checkpointedResources);
+
void registerExecutor(
const process::UPID& from,
const FrameworkID& frameworkId,
@@ -402,6 +404,9 @@ private:
SlaveInfo info;
+ // Resources that are checkpointed by the slave.
+ Resources checkpointedResources;
+
Option<process::UPID> master;
hashmap<FrameworkID, Framework*> frameworks;
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 04084fc..dc1aea7 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -176,6 +176,10 @@ struct State
Option<ResourcesState> resources;
Option<SlaveState> slave;
+
+ // TODO(jieyu): Consider using a vector of Option<Error> here so
+ // that we can print all the errors. This also applies to all the
+ // State structs below.
unsigned int errors;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index ffbaedd..e3a8d33 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -117,7 +117,7 @@ protected:
// slave when the framework creates/destroys persistent volumes, and
// the resources in the messages correctly reflect the resources that
// need to be checkpointed on the slave.
-TEST_F(PersistentVolumeTest, CheckpointResources)
+TEST_F(PersistentVolumeTest, SendingCheckpointResourcesMessage)
{
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
frameworkInfo.set_role("role1");
@@ -192,3 +192,73 @@ TEST_F(PersistentVolumeTest, CheckpointResources)
Shutdown();
}
+
+
+// This test verifies that the slave checkpoints the resources for
+// persistent volumes to the disk, recovers them upon restart, and
+// sends them to the master during re-registration.
+TEST_F(PersistentVolumeTest, ResourcesCheckpointing)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role1");
+
+ Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.checkpoint = true;
+ slaveFlags.resources = "disk(role1):1024";
+
+ Try<PID<Slave>> slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_FALSE(offers.get().empty());
+
+ Offer offer = offers.get()[0];
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+ Resources volume = PersistentVolume(
+ Megabytes(64),
+ "role1",
+ "id1",
+ "path1");
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CreateOperation(volume)});
+
+ AWAIT_READY(checkpointResources);
+
+ // Restart the slave.
+ Stop(slave.get());
+
+ Future<ReregisterSlaveMessage> reregisterSlave =
+ FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+ slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(reregisterSlave);
+ EXPECT_EQ(Resources(reregisterSlave.get().checkpointed_resources()), volume);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}