You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/01/12 12:02:08 UTC

[1/2] mesos git commit: Added example framework converting disk resources.

Repository: mesos
Updated Branches:
  refs/heads/1.5.x 972697ac7 -> cc4f022ba
  refs/heads/master 370b5b634 -> 3d8ef23c0


Added example framework converting disk resources.

This patch introduces an example HTTP framework which transforms
'RAW' disk resources from resource providers into 'MOUNT' volumes and
subsequently unreserves them.

Review: https://reviews.apache.org/r/64932/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d8ef23c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d8ef23c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d8ef23c

Branch: refs/heads/master
Commit: 3d8ef23c0ecec028641d7beee4c85233495a030b
Parents: 370b5b6
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Jan 11 17:11:29 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Jan 12 12:28:52 2018 +0100

----------------------------------------------------------------------
 src/Makefile.am                          |   5 +
 src/examples/CMakeLists.txt              |   2 +
 src/examples/test_csi_user_framework.cpp | 435 ++++++++++++++++++++++++++
 3 files changed, 442 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8ef23c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bfe9eb1..191594b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2230,6 +2230,11 @@ test_http_framework_SOURCES = examples/test_http_framework.cpp
 test_http_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
 test_http_framework_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += test-csi-user-framework
+test_csi_user_framework_SOURCES = examples/test_csi_user_framework.cpp
+test_csi_user_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_csi_user_framework_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += test-framework
 test_framework_SOURCES = examples/test_framework.cpp
 test_framework_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8ef23c/src/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt
index d4f1af4..e85eed6 100644
--- a/src/examples/CMakeLists.txt
+++ b/src/examples/CMakeLists.txt
@@ -50,6 +50,7 @@ if (NOT WIN32)
   add_executable(test-framework                test_framework.cpp)
   add_executable(test-http-executor            test_http_executor.cpp)
   add_executable(test-http-framework           test_http_framework.cpp)
+  add_executable(test-csi-user-framework       test_csi_user_framework.cpp)
 
   # NOTE: Do not replace this with `link_libraries()`. While it may result in
   # less code, it is deprecated and relies on side effects instead of
@@ -82,4 +83,5 @@ if (NOT WIN32)
   target_link_libraries(test-framework                PRIVATE mesos)
   target_link_libraries(test-http-executor            PRIVATE mesos)
   target_link_libraries(test-http-framework           PRIVATE mesos)
+  target_link_libraries(test-csi-user-framework       PRIVATE mesos)
 endif ()

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d8ef23c/src/examples/test_csi_user_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_user_framework.cpp b/src/examples/test_csi_user_framework.cpp
new file mode 100644
index 0000000..91212e9
--- /dev/null
+++ b/src/examples/test_csi_user_framework.cpp
@@ -0,0 +1,435 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+using namespace mesos::v1;
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+using std::vector;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+constexpr Seconds REFUSE_TIME = Seconds::max();
+
+constexpr char FRAMEWORK_NAME[] =
+  "CSI User Event Call Scheduler using libprocess (C++)";
+
+
+/**
+ * Example framework performing CSI-related operations.
+ *
+ * This class implements a framework communicating with Mesos over the v1 HTTP
+ * API which transform certain disk resources using CSI operations.
+ *
+ * It expects to be offered disk resources from resource providers reserved for
+ * its role (which likely should not be shared by any other framework in the
+ * cluster).  It will then convert `RAW` disk resources into `MOUNT` volumes and
+ * unreserve these `MOUNT` resources when they are offered again so that
+ * frameworks in other roles can consume the created disk resource.
+ *
+ * The bulk of the framework logic is implemented in `resourceOffers`.
+ */
+class HTTPScheduler : public process::Process<HTTPScheduler>
+{
+public:
+  HTTPScheduler(
+      const FrameworkInfo& _framework,
+      const string& _master)
+    : framework(_framework),
+      master(_master),
+      state(INITIALIZING) {}
+
+  HTTPScheduler(
+      const FrameworkInfo& _framework,
+      const string& _master,
+      const Credential& credential)
+    : framework(_framework),
+      master(_master),
+      state(INITIALIZING) {}
+
+  ~HTTPScheduler() {}
+
+  void connected()
+  {
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
+
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+          state = SUBSCRIBED;
+
+          cout << "Subscribed with ID " << framework.id() << endl;
+          break;
+        }
+
+        case Event::OFFERS: {
+          cout << endl << "Received an OFFERS event" << endl;
+          resourceOffers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::INVERSE_OFFERS: {
+          cout << endl << "Received an INVERSE_OFFERS event" << endl;
+          break;
+        }
+
+        case Event::RESCIND: {
+          cout << endl << "Received a RESCIND event" << endl;
+          break;
+        }
+
+        case Event::RESCIND_INVERSE_OFFER: {
+          cout << endl << "Received a RESCIND_INVERSE_OFFER event" << endl;
+          break;
+        }
+
+        case Event::UPDATE: {
+          cout << endl << "Received an UPDATE event" << endl;
+          cout << "Unexpected UPDATE event: " << event.update().DebugString()
+               << endl;
+          process::terminate(self());
+          break;
+        }
+
+        // TODO(greggomann): Implement handling of offer operation updates.
+        case Event::UPDATE_OPERATION_STATUS:
+          break;
+
+        case Event::MESSAGE: {
+          cout << endl << "Received a MESSAGE event" << endl;
+          break;
+        }
+
+        case Event::FAILURE: {
+          cout << endl << "Received a FAILURE event" << endl;
+
+          if (event.failure().has_agent_id()) {
+            // Agent failed.
+            cout << "Agent '" << event.failure().agent_id().value()
+                 << "' terminated" << endl;
+          } else {
+            cout << "Unexpected FAILURE event: "
+                 << event.failure().DebugString() << endl;
+            process::terminate(self());
+          }
+          break;
+        }
+
+        case Event::ERROR: {
+          cout << endl << "Received an ERROR event: "
+               << event.error().message() << endl;
+          process::terminate(self());
+          break;
+        }
+
+        case Event::HEARTBEAT: {
+          cout << endl << "Received a HEARTBEAT event" << endl;
+          break;
+        }
+
+        case Event::UNKNOWN: {
+          LOG(WARNING) << "Received an UNKNOWN event and ignored";
+          break;
+        }
+      }
+    }
+  }
+
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(
+        new scheduler::Mesos(
+            master,
+            mesos::ContentType::PROTOBUF,
+            process::defer(self(), &Self::connected),
+            process::defer(self(), &Self::disconnected),
+            process::defer(self(), &Self::received, lambda::_1),
+            None()));
+  }
+
+private:
+  void resourceOffers(const vector<Offer>& offers)
+  {
+    foreach (const Offer& offer, offers) {
+      cout << "Received offer " << offer.id() << " with "
+           << Resources(offer.resources()) << endl;
+
+      // Filter resources and group interesting resources by resource provider.
+      //
+      // NOTE: We introduce a typedef here so we can use this type
+      // with the `foreachvalue` preprocessor macro below.
+      using ResourcesByType =
+        hashmap<Resource::DiskInfo::Source::Type, Resources>;
+
+      hashmap<ResourceProviderID, ResourcesByType> resourcesByProvider;
+
+      constexpr Resource::DiskInfo::Source::Type RAW =
+        Resource::DiskInfo::Source::RAW;
+
+      constexpr Resource::DiskInfo::Source::Type MOUNT =
+        Resource::DiskInfo::Source::MOUNT;
+
+      foreach(const Resource& resource, offer.resources()) {
+        // Ignore any resources not from a resource provider.
+        if (!resource.has_provider_id()) {
+          continue;
+        }
+
+        // We only convert reserved resources.
+        if (!Resources::isReserved(resource)) {
+          continue;
+        }
+
+        // We either work on `RAW` or `MOUNT` disk resources. Store
+        // them by type and ignore any other resources.
+        if (Resources::isDisk(resource, RAW)) {
+          resourcesByProvider[resource.provider_id()][RAW] += resource;
+        } else if (Resources::isDisk(resource, MOUNT)) {
+          resourcesByProvider[resource.provider_id()][MOUNT] += resource;
+        }
+      }
+
+      // Create operations.
+
+      // Create a single call which is either an accept and contains
+      // at least one operation, or a decline.
+      Call call;
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      // We create operations on resources from each resource provider
+      // separately as most operations currently cannot operate on multiple
+      // resource providers at once (`LAUNCH` being the obvious exception).
+      foreachvalue (
+          const ResourcesByType& resourcesByType,
+          resourcesByProvider) {
+        // Iterate over disk resources grouped by disk type since the
+        // performed operation depends on the type.
+        foreachpair (
+            const Resource::DiskInfo::Source::Type& type,
+            const Resources& resources,
+            resourcesByType) {
+          call.set_type(Call::ACCEPT);
+          Call::Accept* accept = call.mutable_accept();
+          if (accept->offer_ids().empty()) {
+            accept->add_offer_ids()->CopyFrom(offer.id());
+          }
+
+          if (type == RAW) {
+            // We create `MOUNT` volumes out of `RAW` disk resources.
+            foreach (const Resource& resource, resources) {
+              cout << "Converting 'RAW' disk to 'MOUNT' disk" << endl;
+
+              Offer::Operation* operation = accept->add_operations();
+              operation->set_type(Offer::Operation::CREATE_VOLUME);
+
+              Offer::Operation::CreateVolume* create_volume =
+                operation->mutable_create_volume();
+
+              create_volume->mutable_source()->CopyFrom(resource);
+              create_volume->set_target_type(
+                  Resource::DiskInfo::Source::MOUNT);
+            }
+          } else if (type == MOUNT) {
+            // We unreserve `MOUNT` disk resources so they can be
+            // consumed by frameworks in other roles.
+            foreach (const Resource& resource, resources) {
+              cout << "Unreserving 'MOUNT' disk" << endl;
+
+              Offer::Operation* operation = accept->add_operations();
+              operation->set_type(Offer::Operation::UNRESERVE);
+
+              Offer::Operation::Unreserve* unreserve =
+                operation->mutable_unreserve();
+
+              unreserve->add_resources()->CopyFrom(resource);
+            }
+          }
+        }
+      }
+
+      // If we did not create operations to accept the offer with decline it.
+      if (!call.has_accept() || call.accept().operations().empty()) {
+        cout << "Declining offer" << endl;
+
+        call.clear_accept();
+        call.set_type(Call::DECLINE);
+        Call::Decline* decline = call.mutable_decline();
+        decline->add_offer_ids()->CopyFrom(offer.id());
+        decline->mutable_filters()->set_refuse_seconds(REFUSE_TIME.secs());
+      }
+
+      mesos->send(call);
+    }
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED) {
+      return;
+    }
+
+    Call call;
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  FrameworkInfo framework;
+  string master;
+  process::Owned<scheduler::Mesos> mesos;
+
+  enum State
+  {
+    INITIALIZING = 0,
+    SUBSCRIBED = 1,
+    DISCONNECTED = 2
+  } state;
+};
+
+
+class Flags : public virtual mesos::internal::logging::Flags
+{
+public:
+  Flags()
+  {
+    add(&Flags::role, "role", "Role to use when registering", "*");
+
+    add(&Flags::master, "master", "ip:port of master to connect");
+
+    add(&Flags::checkpoint,
+        "checkpoint",
+        "Whether this framework should be checkpointed.",
+        false);
+
+    add(&Flags::principal,
+        "principal",
+        "Authentication principal of the framework");
+  }
+
+  string role;
+  string master;
+  string principal;
+  bool checkpoint;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  Try<flags::Warnings> load = flags.load(None(), argc, argv);
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
+
+  // Log any flag warnings.
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  FrameworkInfo framework;
+  framework.set_name(FRAMEWORK_NAME);
+  framework.add_roles(flags.role);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::MULTI_ROLE);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+
+  const Result<string> user = os::user();
+
+  CHECK_SOME(user);
+  framework.set_user(user.get());
+  framework.set_checkpoint(flags.checkpoint);
+  framework.set_principal(flags.principal);
+
+  process::Owned<HTTPScheduler> scheduler(
+      new HTTPScheduler(framework, flags.master));
+
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
+
+  return EXIT_SUCCESS;
+}


[2/2] mesos git commit: Added example framework converting disk resources.

Posted by bb...@apache.org.
Added example framework converting disk resources.

This patch introduces an example HTTP framework which transforms
'RAW' disk resources from resource providers into 'MOUNT' volumes and
subsequently unreserves them.

Review: https://reviews.apache.org/r/64932/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc4f022b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc4f022b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc4f022b

Branch: refs/heads/1.5.x
Commit: cc4f022baec705a31e4de168070879399c50ea2a
Parents: 972697a
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Jan 11 17:11:29 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Jan 12 13:00:08 2018 +0100

----------------------------------------------------------------------
 src/Makefile.am                          |   5 +
 src/examples/CMakeLists.txt              |   2 +
 src/examples/test_csi_user_framework.cpp | 435 ++++++++++++++++++++++++++
 3 files changed, 442 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4f022b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bfe9eb1..191594b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2230,6 +2230,11 @@ test_http_framework_SOURCES = examples/test_http_framework.cpp
 test_http_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
 test_http_framework_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += test-csi-user-framework
+test_csi_user_framework_SOURCES = examples/test_csi_user_framework.cpp
+test_csi_user_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_csi_user_framework_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += test-framework
 test_framework_SOURCES = examples/test_framework.cpp
 test_framework_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4f022b/src/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt
index d4f1af4..e85eed6 100644
--- a/src/examples/CMakeLists.txt
+++ b/src/examples/CMakeLists.txt
@@ -50,6 +50,7 @@ if (NOT WIN32)
   add_executable(test-framework                test_framework.cpp)
   add_executable(test-http-executor            test_http_executor.cpp)
   add_executable(test-http-framework           test_http_framework.cpp)
+  add_executable(test-csi-user-framework       test_csi_user_framework.cpp)
 
   # NOTE: Do not replace this with `link_libraries()`. While it may result in
   # less code, it is deprecated and relies on side effects instead of
@@ -82,4 +83,5 @@ if (NOT WIN32)
   target_link_libraries(test-framework                PRIVATE mesos)
   target_link_libraries(test-http-executor            PRIVATE mesos)
   target_link_libraries(test-http-framework           PRIVATE mesos)
+  target_link_libraries(test-csi-user-framework       PRIVATE mesos)
 endif ()

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4f022b/src/examples/test_csi_user_framework.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_user_framework.cpp b/src/examples/test_csi_user_framework.cpp
new file mode 100644
index 0000000..91212e9
--- /dev/null
+++ b/src/examples/test_csi_user_framework.cpp
@@ -0,0 +1,435 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/numify.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+using namespace mesos::v1;
+
+using std::cerr;
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+using std::vector;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+constexpr Seconds REFUSE_TIME = Seconds::max();
+
+constexpr char FRAMEWORK_NAME[] =
+  "CSI User Event Call Scheduler using libprocess (C++)";
+
+
+/**
+ * Example framework performing CSI-related operations.
+ *
+ * This class implements a framework communicating with Mesos over the v1 HTTP
+ * API which transform certain disk resources using CSI operations.
+ *
+ * It expects to be offered disk resources from resource providers reserved for
+ * its role (which likely should not be shared by any other framework in the
+ * cluster).  It will then convert `RAW` disk resources into `MOUNT` volumes and
+ * unreserve these `MOUNT` resources when they are offered again so that
+ * frameworks in other roles can consume the created disk resource.
+ *
+ * The bulk of the framework logic is implemented in `resourceOffers`.
+ */
+class HTTPScheduler : public process::Process<HTTPScheduler>
+{
+public:
+  HTTPScheduler(
+      const FrameworkInfo& _framework,
+      const string& _master)
+    : framework(_framework),
+      master(_master),
+      state(INITIALIZING) {}
+
+  HTTPScheduler(
+      const FrameworkInfo& _framework,
+      const string& _master,
+      const Credential& credential)
+    : framework(_framework),
+      master(_master),
+      state(INITIALIZING) {}
+
+  ~HTTPScheduler() {}
+
+  void connected()
+  {
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << endl << "Received a SUBSCRIBED event" << endl;
+
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+          state = SUBSCRIBED;
+
+          cout << "Subscribed with ID " << framework.id() << endl;
+          break;
+        }
+
+        case Event::OFFERS: {
+          cout << endl << "Received an OFFERS event" << endl;
+          resourceOffers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::INVERSE_OFFERS: {
+          cout << endl << "Received an INVERSE_OFFERS event" << endl;
+          break;
+        }
+
+        case Event::RESCIND: {
+          cout << endl << "Received a RESCIND event" << endl;
+          break;
+        }
+
+        case Event::RESCIND_INVERSE_OFFER: {
+          cout << endl << "Received a RESCIND_INVERSE_OFFER event" << endl;
+          break;
+        }
+
+        case Event::UPDATE: {
+          cout << endl << "Received an UPDATE event" << endl;
+          cout << "Unexpected UPDATE event: " << event.update().DebugString()
+               << endl;
+          process::terminate(self());
+          break;
+        }
+
+        // TODO(greggomann): Implement handling of offer operation updates.
+        case Event::UPDATE_OPERATION_STATUS:
+          break;
+
+        case Event::MESSAGE: {
+          cout << endl << "Received a MESSAGE event" << endl;
+          break;
+        }
+
+        case Event::FAILURE: {
+          cout << endl << "Received a FAILURE event" << endl;
+
+          if (event.failure().has_agent_id()) {
+            // Agent failed.
+            cout << "Agent '" << event.failure().agent_id().value()
+                 << "' terminated" << endl;
+          } else {
+            cout << "Unexpected FAILURE event: "
+                 << event.failure().DebugString() << endl;
+            process::terminate(self());
+          }
+          break;
+        }
+
+        case Event::ERROR: {
+          cout << endl << "Received an ERROR event: "
+               << event.error().message() << endl;
+          process::terminate(self());
+          break;
+        }
+
+        case Event::HEARTBEAT: {
+          cout << endl << "Received a HEARTBEAT event" << endl;
+          break;
+        }
+
+        case Event::UNKNOWN: {
+          LOG(WARNING) << "Received an UNKNOWN event and ignored";
+          break;
+        }
+      }
+    }
+  }
+
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(
+        new scheduler::Mesos(
+            master,
+            mesos::ContentType::PROTOBUF,
+            process::defer(self(), &Self::connected),
+            process::defer(self(), &Self::disconnected),
+            process::defer(self(), &Self::received, lambda::_1),
+            None()));
+  }
+
+private:
+  void resourceOffers(const vector<Offer>& offers)
+  {
+    foreach (const Offer& offer, offers) {
+      cout << "Received offer " << offer.id() << " with "
+           << Resources(offer.resources()) << endl;
+
+      // Filter resources and group interesting resources by resource provider.
+      //
+      // NOTE: We introduce a typedef here so we can use this type
+      // with the `foreachvalue` preprocessor macro below.
+      using ResourcesByType =
+        hashmap<Resource::DiskInfo::Source::Type, Resources>;
+
+      hashmap<ResourceProviderID, ResourcesByType> resourcesByProvider;
+
+      constexpr Resource::DiskInfo::Source::Type RAW =
+        Resource::DiskInfo::Source::RAW;
+
+      constexpr Resource::DiskInfo::Source::Type MOUNT =
+        Resource::DiskInfo::Source::MOUNT;
+
+      foreach(const Resource& resource, offer.resources()) {
+        // Ignore any resources not from a resource provider.
+        if (!resource.has_provider_id()) {
+          continue;
+        }
+
+        // We only convert reserved resources.
+        if (!Resources::isReserved(resource)) {
+          continue;
+        }
+
+        // We either work on `RAW` or `MOUNT` disk resources. Store
+        // them by type and ignore any other resources.
+        if (Resources::isDisk(resource, RAW)) {
+          resourcesByProvider[resource.provider_id()][RAW] += resource;
+        } else if (Resources::isDisk(resource, MOUNT)) {
+          resourcesByProvider[resource.provider_id()][MOUNT] += resource;
+        }
+      }
+
+      // Create operations.
+
+      // Create a single call which is either an accept and contains
+      // at least one operation, or a decline.
+      Call call;
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      // We create operations on resources from each resource provider
+      // separately as most operations currently cannot operate on multiple
+      // resource providers at once (`LAUNCH` being the obvious exception).
+      foreachvalue (
+          const ResourcesByType& resourcesByType,
+          resourcesByProvider) {
+        // Iterate over disk resources grouped by disk type since the
+        // performed operation depends on the type.
+        foreachpair (
+            const Resource::DiskInfo::Source::Type& type,
+            const Resources& resources,
+            resourcesByType) {
+          call.set_type(Call::ACCEPT);
+          Call::Accept* accept = call.mutable_accept();
+          if (accept->offer_ids().empty()) {
+            accept->add_offer_ids()->CopyFrom(offer.id());
+          }
+
+          if (type == RAW) {
+            // We create `MOUNT` volumes out of `RAW` disk resources.
+            foreach (const Resource& resource, resources) {
+              cout << "Converting 'RAW' disk to 'MOUNT' disk" << endl;
+
+              Offer::Operation* operation = accept->add_operations();
+              operation->set_type(Offer::Operation::CREATE_VOLUME);
+
+              Offer::Operation::CreateVolume* create_volume =
+                operation->mutable_create_volume();
+
+              create_volume->mutable_source()->CopyFrom(resource);
+              create_volume->set_target_type(
+                  Resource::DiskInfo::Source::MOUNT);
+            }
+          } else if (type == MOUNT) {
+            // We unreserve `MOUNT` disk resources so they can be
+            // consumed by frameworks in other roles.
+            foreach (const Resource& resource, resources) {
+              cout << "Unreserving 'MOUNT' disk" << endl;
+
+              Offer::Operation* operation = accept->add_operations();
+              operation->set_type(Offer::Operation::UNRESERVE);
+
+              Offer::Operation::Unreserve* unreserve =
+                operation->mutable_unreserve();
+
+              unreserve->add_resources()->CopyFrom(resource);
+            }
+          }
+        }
+      }
+
+      // If we did not create operations to accept the offer with decline it.
+      if (!call.has_accept() || call.accept().operations().empty()) {
+        cout << "Declining offer" << endl;
+
+        call.clear_accept();
+        call.set_type(Call::DECLINE);
+        Call::Decline* decline = call.mutable_decline();
+        decline->add_offer_ids()->CopyFrom(offer.id());
+        decline->mutable_filters()->set_refuse_seconds(REFUSE_TIME.secs());
+      }
+
+      mesos->send(call);
+    }
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED) {
+      return;
+    }
+
+    Call call;
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  FrameworkInfo framework;
+  string master;
+  process::Owned<scheduler::Mesos> mesos;
+
+  enum State
+  {
+    INITIALIZING = 0,
+    SUBSCRIBED = 1,
+    DISCONNECTED = 2
+  } state;
+};
+
+
+class Flags : public virtual mesos::internal::logging::Flags
+{
+public:
+  Flags()
+  {
+    add(&Flags::role, "role", "Role to use when registering", "*");
+
+    add(&Flags::master, "master", "ip:port of master to connect");
+
+    add(&Flags::checkpoint,
+        "checkpoint",
+        "Whether this framework should be checkpointed.",
+        false);
+
+    add(&Flags::principal,
+        "principal",
+        "Authentication principal of the framework");
+  }
+
+  string role;
+  string master;
+  string principal;
+  bool checkpoint;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  Try<flags::Warnings> load = flags.load(None(), argc, argv);
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
+
+  // Log any flag warnings.
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  FrameworkInfo framework;
+  framework.set_name(FRAMEWORK_NAME);
+  framework.add_roles(flags.role);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::MULTI_ROLE);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+
+  const Result<string> user = os::user();
+
+  CHECK_SOME(user);
+  framework.set_user(user.get());
+  framework.set_checkpoint(flags.checkpoint);
+  framework.set_principal(flags.principal);
+
+  process::Owned<HTTPScheduler> scheduler(
+      new HTTPScheduler(framework, flags.master));
+
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
+
+  return EXIT_SUCCESS;
+}