You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/02/09 20:37:02 UTC

mesos git commit: Started to maintain and checkpoint persisted resource in slave.

Repository: mesos
Updated Branches:
  refs/heads/master 6e554f6ef -> bf53ee56d


Started to maintain and checkpoint persisted resource in slave.

Review: https://reviews.apache.org/r/28809


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bf53ee56
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bf53ee56
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bf53ee56

Branch: refs/heads/master
Commit: bf53ee56d49bd11cc2477d0f026047c6745b8417
Parents: 6e554f6
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Jan 29 10:18:35 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Feb 9 11:33:06 2015 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp                   | 91 ++++++++++++++++++++++++++++++
 src/slave/slave.hpp                   |  5 ++
 src/slave/state.hpp                   |  4 ++
 src/tests/persistent_volume_tests.cpp | 72 ++++++++++++++++++++++-
 4 files changed, 171 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index fff2d72..7a29f86 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -394,6 +394,10 @@ void Slave::initialize()
       &UpdateFrameworkMessage::framework_id,
       &UpdateFrameworkMessage::pid);
 
+  install<CheckpointResourcesMessage>(
+      &Slave::checkpointResources,
+      &CheckpointResourcesMessage::resources);
+
   install<StatusUpdateAcknowledgementMessage>(
       &Slave::statusUpdateAcknowledgement,
       &StatusUpdateAcknowledgementMessage::slave_id,
@@ -969,12 +973,18 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     message.set_version(MESOS_VERSION);
     message.mutable_slave()->CopyFrom(info);
 
+    // Include checkpointed resources.
+    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
+
     send(master.get(), message);
   } else {
     // Re-registering, so send tasks running.
     ReregisterSlaveMessage message;
     message.set_version(MESOS_VERSION);
 
+    // Include checkpointed resources.
+    message.mutable_checkpointed_resources()->CopyFrom(checkpointedResources);
+
     // TODO(bmahler): Remove in 0.22.0.
     message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_slave()->CopyFrom(info);
@@ -1316,6 +1326,30 @@ void Slave::_runTask(
     return;
   }
 
+  // NOTE: If the task or executor uses persistent volumes, the slave
+  // should already know about it. In case the slave doesn't know
+  // about them (e.g., CheckpointResourcesMessage was dropped or came
+  // out of order), we simply fail the slave to be safe.
+  foreach (const Resource& resource, task.resources()) {
+    if (resource.has_disk() && resource.disk().has_persistence()) {
+      CHECK(checkpointedResources.contains(resource))
+        << "Unknown persistent volume " << resource
+        << " for task " << task.task_id()
+        << " of framework " << frameworkId;
+    }
+  }
+
+  if (task.has_executor()) {
+    foreach (const Resource& resource, task.executor().resources()) {
+      if (resource.has_disk() && resource.disk().has_persistence()) {
+        CHECK(checkpointedResources.contains(resource))
+          << "Unknown persistent volume " << resource
+          << " for executor " << task.executor().executor_id()
+          << " of framework " << frameworkId;
+      }
+    }
+  }
+
   // NOTE: The slave cannot be in 'RECOVERING' because the task would
   // have been rejected in 'runTask()' in that case.
   CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
@@ -1793,6 +1827,49 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
 }
 
 
+void Slave::checkpointResources(const vector<Resource>& _checkpointedResources)
+{
+  // TODO(jieyu): Here we assume that CheckpointResourcesMessages are
+  // ordered (i.e., slave receives them in the same order master sends
+  // them). This should be true in most of the cases because TCP
+  // enforces in order delivery per connection. However, the ordering
+  // is technically not guaranteed because master creates multiple
+  // connections to the slave in some cases (e.g., persistent socket
+  // to slave breaks and master uses ephemeral socket). This could
+  // potentially be solved by using a version number and rejecting
+  // stale messages according to the version number.
+  //
+  // If CheckpointResourcesMessages are delivered out-of-order, there
+  // are two cases to consider:
+  //  (1) If master does not fail over, it will reconcile the state
+  //      with the slave if the framework later changes the
+  //      checkpointed resources. Since master is the source of truth
+  //      for reservations, the inconsistency is not exposed to
+  //      frameworks.
+  //  (2) If master does fail over, the slave will inform the new
+  //      master about the incorrect checkpointed resources. When that
+  //      happens, we expect framework to reconcile based on the
+  //      offers they get.
+  Resources newCheckpointedResources = _checkpointedResources;
+
+  CHECK_SOME(state::checkpoint(
+      paths::getResourcesInfoPath(metaDir),
+      newCheckpointedResources))
+    << "Failed to checkpoint resources " << newCheckpointedResources;
+
+  // TODO(jieyu): Schedule gc for released persistent volumes. We need
+  // to consider dynamic reservation here because the framework can
+  // release dynamic reservation while still wants to keep the
+  // persistent volume.
+
+  LOG(INFO) << "Updated checkpointed resources from "
+            << checkpointedResources << " to "
+            << newCheckpointedResources;
+
+  checkpointedResources = newCheckpointedResources;
+}
+
+
 void Slave::statusUpdateAcknowledgement(
     const UPID& from,
     const SlaveID& slaveId,
@@ -3398,11 +3475,25 @@ Future<Nothing> Slave::recover(const Result<state::State>& state)
     return Failure(state.error());
   }
 
+  Option<ResourcesState> resourcesState;
   Option<SlaveState> slaveState;
   if (state.isSome()) {
+    resourcesState = state.get().resources;
     slaveState = state.get().slave;
   }
 
+  // Recover checkpointed resources.
+  if (resourcesState.isSome()) {
+    if (resourcesState.get().errors > 0) {
+      LOG(WARNING) << "Errors encountered during resources recovery: "
+                   << resourcesState.get().errors;
+
+      metrics.recovery_errors += resourcesState.get().errors;
+    }
+
+    checkpointedResources = resourcesState.get().resources;
+  }
+
   if (slaveState.isSome() && slaveState.get().info.isSome()) {
     // Check for SlaveInfo compatibility.
     // TODO(vinod): Also check for version compatibility.

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 9adee17..35c7c8b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -137,6 +137,8 @@ public:
 
   void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
 
+  void checkpointResources(const std::vector<Resource>& checkpointedResources);
+
   void registerExecutor(
       const process::UPID& from,
       const FrameworkID& frameworkId,
@@ -402,6 +404,9 @@ private:
 
   SlaveInfo info;
 
+  // Resources that are checkpointed by the slave.
+  Resources checkpointedResources;
+
   Option<process::UPID> master;
 
   hashmap<FrameworkID, Framework*> frameworks;

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 04084fc..dc1aea7 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -176,6 +176,10 @@ struct State
 
   Option<ResourcesState> resources;
   Option<SlaveState> slave;
+
+  // TODO(jieyu): Consider using a vector of Option<Error> here so
+  // that we can print all the errors. This also applies to all the
+  // State structs below.
   unsigned int errors;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bf53ee56/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index ffbaedd..e3a8d33 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -117,7 +117,7 @@ protected:
 // slave when the framework creates/destroys persistent volumes, and
 // the resources in the messages correctly reflect the resources that
 // need to be checkpointed on the slave.
-TEST_F(PersistentVolumeTest, CheckpointResources)
+TEST_F(PersistentVolumeTest, SendingCheckpointResourcesMessage)
 {
   FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
   frameworkInfo.set_role("role1");
@@ -192,3 +192,73 @@ TEST_F(PersistentVolumeTest, CheckpointResources)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave checkpoints the resources for
+// persistent volumes to the disk, recovers them upon restart, and
+// sends them to the master during re-registration.
+TEST_F(PersistentVolumeTest, ResourcesCheckpointing)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role1");
+
+  Try<PID<Master>> master = StartMaster(MasterFlags({frameworkInfo}));
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.checkpoint = true;
+  slaveFlags.resources = "disk(role1):1024";
+
+  Try<PID<Slave>> slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_FALSE(offers.get().empty());
+
+  Offer offer = offers.get()[0];
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+  Resources volume = PersistentVolume(
+      Megabytes(64),
+      "role1",
+      "id1",
+      "path1");
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CreateOperation(volume)});
+
+  AWAIT_READY(checkpointResources);
+
+  // Restart the slave.
+  Stop(slave.get());
+
+  Future<ReregisterSlaveMessage> reregisterSlave =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(reregisterSlave);
+  EXPECT_EQ(Resources(reregisterSlave.get().checkpointed_resources()), volume);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}