You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ti...@apache.org on 2018/10/26 20:54:43 UTC

[mesos] branch master updated: Added example framework for inverse-offers.

This is an automated email from the ASF dual-hosted git repository.

tillt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new e012f30  Added example framework for inverse-offers.
e012f30 is described below

commit e012f308cd8feff9b24fba19fab0886f0154b16d
Author: Till Toenshoff <to...@me.com>
AuthorDate: Fri Oct 26 22:54:20 2018 +0200

    Added example framework for inverse-offers.
    
    Adds an example framework displaying how to handle inverse offers.
    This example is based on the original review request
    https://reviews.apache.org/r/50010/ by Joseph Wu.
    Some changes were applied adding framework authentication
    capabilites, updated PullGauge metrics and other minor adaptations
    following the other example frameworks.
    
    Review: https://reviews.apache.org/r/68812/
---
 src/Makefile.am                          |   5 +
 src/examples/CMakeLists.txt              |   2 +
 src/examples/inverse_offer_framework.cpp | 646 +++++++++++++++++++++++++++++++
 3 files changed, 653 insertions(+)

diff --git a/src/Makefile.am b/src/Makefile.am
index 5795c70..c17eae4 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2275,6 +2275,11 @@ balloon_executor_SOURCES = examples/balloon_executor.cpp
 balloon_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 balloon_executor_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += inverse-offer-framework
+inverse_offer_framework_SOURCES = examples/inverse_offer_framework.cpp
+inverse_offer_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
+inverse_offer_framework_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += load-generator-framework
 load_generator_framework_SOURCES = examples/load_generator_framework.cpp
 load_generator_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt
index a2c0dd1..f477539 100644
--- a/src/examples/CMakeLists.txt
+++ b/src/examples/CMakeLists.txt
@@ -41,6 +41,7 @@ if (NOT WIN32)
   add_executable(disk-full-framework           disk_full_framework.cpp)
   add_executable(docker-no-executor-framework  docker_no_executor_framework.cpp)
   add_executable(dynamic-reservation-framework dynamic_reservation_framework.cpp)
+  add_executable(inverse-offer-framework       inverse_offer_framework.cpp)
   add_executable(load-generator-framework      load_generator_framework.cpp)
   add_executable(long-lived-executor           long_lived_executor.cpp)
   add_executable(long-lived-framework          long_lived_framework.cpp)
@@ -79,6 +80,7 @@ if (NOT WIN32)
   target_link_libraries(disk-full-framework           PRIVATE mesos)
   target_link_libraries(docker-no-executor-framework  PRIVATE mesos)
   target_link_libraries(dynamic-reservation-framework PRIVATE mesos)
+  target_link_libraries(inverse-offer-framework       PRIVATE mesos)
   target_link_libraries(load-generator-framework      PRIVATE mesos)
   target_link_libraries(long-lived-executor           PRIVATE mesos)
   target_link_libraries(long-lived-framework          PRIVATE mesos)
diff --git a/src/examples/inverse_offer_framework.cpp b/src/examples/inverse_offer_framework.cpp
new file mode 100644
index 0000000..389a6ab
--- /dev/null
+++ b/src/examples/inverse_offer_framework.cpp
@@ -0,0 +1,646 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include <mesos/authorizer/acls.hpp>
+
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+#include <mesos/v1/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/http.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/time.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/metrics.hpp>
+#include <process/metrics/pull_gauge.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/flags.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stringify.hpp>
+
+#include "examples/flags.hpp"
+
+#include "logging/logging.hpp"
+
+using namespace mesos::v1;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+using process::Clock;
+using process::defer;
+
+using process::http::OK;
+
+using process::metrics::Counter;
+using process::metrics::PullGauge;
+
+const float CPUS_PER_TASK = 0.2;
+const int32_t MEM_PER_TASK = 32;
+
+constexpr char FRAMEWORK_NAME[] = "Inverse Offer Framework (C++)";
+constexpr char FRAMEWORK_METRICS_PREFIX[] = "inverse_offer_framework";
+
+
+// Holds a sleep task and when the task's machine is scheduled for maintenance.
+struct SleeperInfo
+{
+  TaskID taskId;
+  TimeInfo unavailability;
+};
+
+
+// This scheduler launches and maintains a configurable number of
+// infinite-sleep tasks, placing at most one task on a single agent.
+// When the operator schedules maintenance on the cluster, the scheduler
+// will respond by migrating sleep tasks ahead of the planned maintenance.
+class InverseOfferScheduler : public process::Process<InverseOfferScheduler>
+{
+public:
+  InverseOfferScheduler(
+      const FrameworkInfo& _framework,
+      const std::string& _master,
+      const uint32_t _num_tasks,
+      const Option<Credential>& _credential)
+    : framework(_framework),
+      master(_master),
+      num_tasks(_num_tasks),
+      credential(_credential),
+      tasks_launched(0),
+      state(DISCONNECTED),
+      metrics(*this)
+  {
+    start_time = Clock::now();
+  }
+
+  ~InverseOfferScheduler() {}
+
+protected:
+  virtual void initialize()
+  {
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new scheduler::Mesos(
+        master,
+        mesos::ContentType::PROTOBUF,
+        process::defer(self(), &Self::connected),
+        process::defer(self(), &Self::disconnected),
+        process::defer(self(), &Self::received, lambda::_1),
+        credential));
+  }
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    LOG(INFO) << "Disconnected!";
+
+    state = DISCONNECTED;
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    if (framework.has_id()) {
+      call.mutable_framework_id()->CopyFrom(framework.id());
+    }
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(framework);
+
+    mesos->send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void received(std::queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      LOG(INFO) << "Received " << event.type() << " event";
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          framework.mutable_id()->CopyFrom(event.subscribed().framework_id());
+
+          LOG(INFO) << "Subscribed with ID '" << framework.id();
+          state = SUBSCRIBED;
+          break;
+        }
+
+        case Event::OFFERS: {
+          metrics.offers_received += event.offers().offers().size();
+
+          resourceOffers(google::protobuf::convert(event.offers().offers()));
+          break;
+        }
+
+        case Event::INVERSE_OFFERS: {
+          metrics.inverse_offers_received +=
+            event.inverse_offers().inverse_offers().size();
+
+          inverseOffers(google::protobuf::convert(
+              event.inverse_offers().inverse_offers()));
+          break;
+        }
+
+        case Event::UPDATE: {
+          statusUpdate(event.update().status());
+          break;
+        }
+
+        // TODO(greggomann): Implement handling of operation status updates.
+        case Event::UPDATE_OPERATION_STATUS:
+          break;
+
+        case Event::FAILURE: {
+          const Event::Failure& failure = event.failure();
+
+          if (failure.has_agent_id() && failure.has_executor_id()) {
+            LOG(INFO)
+              << "Executor '" << failure.executor_id()
+              << "' lost on agent '" << failure.agent_id()
+              << (failure.has_status() ?
+                  "' with status: " + stringify(failure.status()) : "");
+          } else {
+            CHECK(failure.has_agent_id());
+
+            LOG(INFO) << "Agent lost: " << failure.agent_id();
+          }
+          break;
+        }
+
+        case Event::ERROR: {
+          EXIT(EXIT_FAILURE) << "Error: " << event.error().message();
+          break;
+        }
+
+        case Event::HEARTBEAT:
+        case Event::RESCIND:
+        case Event::RESCIND_INVERSE_OFFER:
+        case Event::MESSAGE: {
+          break;
+        }
+
+        case Event::UNKNOWN: {
+          LOG(WARNING) << "Received an UNKNOWN event and ignored";
+          break;
+        }
+      }
+    }
+  }
+
+private:
+  void resourceOffers(const std::vector<Offer>& offers)
+  {
+    CHECK(framework.has_id());
+
+    // Of existing sleep tasks, identify the one running on an agent
+    // with the least expected uptime (i.e. next to be maintained).
+    // We'll see if we can migrate this sleep task.
+    Option<AgentID> riskiestAgent;
+    foreachpair (const AgentID& agentId, const SleeperInfo& sleeper, sleepers) {
+      if (riskiestAgent.isSome()) {
+        if (sleeper.unavailability.nanoseconds() <
+            sleepers[riskiestAgent.get()].unavailability.nanoseconds()) {
+          riskiestAgent = agentId;
+        }
+      } else if (sleeper.unavailability.nanoseconds() > 0) {
+        riskiestAgent = agentId;
+      }
+    }
+
+    foreach (const Offer& offer, offers) {
+      const Resources taskResources = [this]() {
+        Resources resources = Resources::parse(
+            "cpus:" + stringify(CPUS_PER_TASK) +
+            ";mem:" + stringify(MEM_PER_TASK)).get();
+        resources.allocate(framework.role());
+        return resources;
+      }();
+
+      // Are there already `num_task` sleep tasks running?
+      // Having `num_task` sleeps running takes priority over dealing
+      // with maintenance.
+      bool needMoreSleep = sleepers.size() < num_tasks;
+
+      // Is the agent in the offer less risky than our riskiest agent?
+      // i.e. The offered agent's planned downtime is farther away.
+      bool offeredAgentIsLessRisky = riskiestAgent.isSome() &&
+          (!offer.has_unavailability() ||
+            offer.unavailability().start().nanoseconds() >
+            sleepers[riskiestAgent.get()].unavailability.nanoseconds());
+
+      // Are we already running a task on this agent?
+      // This scheduler will only launch one task per agent.
+      bool offeredAgentIsOccupied = sleepers.contains(offer.agent_id());
+
+      // We only need to accept an offer if we do not have enough sleep
+      // tasks active, or the offer provides a better agent.
+      bool needToLaunchTask = !offeredAgentIsOccupied &&
+        (needMoreSleep || offeredAgentIsLessRisky);
+
+      Resources resources(offer.resources());
+
+      // Check if this offer is big enough and if we need to launch anything.
+      if (!resources.toUnreserved().contains(taskResources) ||
+          !needToLaunchTask) {
+        Call call;
+        call.mutable_framework_id()->CopyFrom(framework.id());
+        call.set_type(Call::DECLINE);
+
+        Call::Decline* decline = call.mutable_decline();
+        decline->add_offer_ids()->CopyFrom(offer.id());
+        decline->mutable_filters()->set_refuse_seconds(600);
+
+        mesos->send(call);
+        continue;
+      }
+
+      // Keeping `num_tasks` running has higher priority than migrating tasks.
+      // We only migrate tasks if there are enough running tasks.
+      if (!needMoreSleep && offeredAgentIsLessRisky) {
+        LOG(INFO) << "Migrating task " << sleepers[riskiestAgent.get()].taskId
+                  << " from " << riskiestAgent.get();
+
+        Call call;
+        call.mutable_framework_id()->CopyFrom(framework.id());
+        call.set_type(Call::KILL);
+
+        Call::Kill* kill = call.mutable_kill();
+        kill->mutable_task_id()->CopyFrom(sleepers[riskiestAgent.get()].taskId);
+        kill->mutable_agent_id()->CopyFrom(riskiestAgent.get());
+
+        mesos->send(call);
+
+        // Keep track of this sleeper in another map.
+        migrating[riskiestAgent.get()] = sleepers[riskiestAgent.get()];
+        sleepers.erase(riskiestAgent.get());
+
+        // For simplicity, we only migrate one task per round of offers.
+        // Setting the `riskiestAgent` means we will no longer consider
+        // migrating tasks in this loop.
+        riskiestAgent = None();
+
+        // Since we killed a task, we need to start another one.
+        needMoreSleep = true;
+      }
+
+      if (needMoreSleep) {
+        LOG(INFO) << "Starting task " << tasks_launched
+                  << " on " << offer.agent_id();
+
+        TaskInfo task;
+        task.mutable_task_id()->set_value(stringify(tasks_launched));
+        task.set_name("Sleeper Agent " + stringify(tasks_launched++));
+        task.mutable_agent_id()->MergeFrom(offer.agent_id());
+        task.mutable_resources()->CopyFrom(taskResources);
+        task.mutable_command()->set_value(
+            "while [ true ]; do echo ZZZzzz...; sleep 5; done");
+
+        Call call;
+        call.mutable_framework_id()->CopyFrom(framework.id());
+        call.set_type(Call::ACCEPT);
+
+        Call::Accept* accept = call.mutable_accept();
+        accept->add_offer_ids()->CopyFrom(offer.id());
+
+        Offer::Operation* operation = accept->add_operations();
+        operation->set_type(Offer::Operation::LAUNCH);
+        operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+
+        mesos->send(call);
+
+        // Save the new sleep task.
+        SleeperInfo sleeper;
+        sleeper.taskId = task.task_id();
+        if (offer.has_unavailability()) {
+          sleeper.unavailability = offer.unavailability().start();
+        }
+        sleepers[offer.agent_id()] = sleeper;
+      }
+    }
+  }
+
+
+  void inverseOffers(const std::vector<InverseOffer>& offers)
+  {
+    foreach (const InverseOffer& offer, offers) {
+      if (!sleepers.contains(offer.agent_id())) {
+        LOG(INFO) << "Inverse offer received for " << offer.agent_id()
+                  << " which does not hold an active sleep task.";
+        continue;
+      }
+
+      // Take note of any agents that are scheduled for maintenance.
+      sleepers[offer.agent_id()].unavailability =
+        offer.unavailability().start();
+
+      // TODO(josephw): Demonstrate some semantics for declining inverse
+      // offers. This framework currently always accepts inverse offers.
+      Call call;
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      call.set_type(Call::ACCEPT_INVERSE_OFFERS);
+      Call::AcceptInverseOffers* accept = call.mutable_accept_inverse_offers();
+      accept->add_inverse_offer_ids()->CopyFrom(offer.id());
+
+      mesos->send(call);
+    }
+  }
+
+
+  void statusUpdate(const TaskStatus& status)
+  {
+    LOG(INFO)
+      << "Task " << status.task_id().value()
+      << " is in state " << TaskState_Name(status.state())
+      << (status.has_message() ? " with message: " + status.message() : "");
+
+    if (status.has_uuid()) {
+      Call call;
+      call.set_type(Call::ACKNOWLEDGE);
+
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+
+      Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+      acknowledge->mutable_agent_id()->CopyFrom(status.agent_id());
+      acknowledge->mutable_task_id()->CopyFrom(status.task_id());
+      acknowledge->set_uuid(status.uuid());
+
+      mesos->send(call);
+    }
+
+    // This is the only expected terminal state.
+    if (migrating.contains(status.agent_id()) &&
+        status.state() == TASK_KILLED) {
+      ++metrics.sleepers_killed;
+
+      migrating.erase(status.agent_id());
+      return;
+    }
+
+    // These are un-expected terminal states.
+    if (status.state() == TASK_FINISHED ||
+        status.state() == TASK_LOST ||
+        status.state() == TASK_FAILED ||
+        status.state() == TASK_ERROR ||
+        status.state() == TASK_KILLED) {
+      ++metrics.sleepers_lost_abnormally;
+
+      sleepers.erase(status.agent_id());
+      migrating.erase(status.agent_id());
+    }
+  }
+
+  void finalize()
+  {
+    if (state == SUBSCRIBED) {
+      Call call;
+      CHECK(framework.has_id());
+      call.mutable_framework_id()->CopyFrom(framework.id());
+      call.set_type(Call::TEARDOWN);
+
+      mesos->send(call);
+    }
+  }
+
+  FrameworkInfo framework;
+  const std::string master;
+  const uint32_t num_tasks;
+  const Option<Credential> credential;
+
+  // Agents which currently hold a sleep task.
+  hashmap<AgentID, SleeperInfo> sleepers;
+  hashmap<AgentID, SleeperInfo> migrating;
+
+  int tasks_launched;
+
+  process::Owned<scheduler::Mesos> mesos;
+
+  enum State
+  {
+    DISCONNECTED,
+    CONNECTED,
+    SUBSCRIBED
+  } state;
+
+  process::Time start_time;
+  double _uptime_secs()
+  {
+    return (Clock::now() - start_time).secs();
+  }
+
+  double _subscribed()
+  {
+    return state == SUBSCRIBED ? 1 : 0;
+  }
+
+  double _current_sleepers()
+  {
+    return sleepers.size() + migrating.size();
+  }
+
+  struct Metrics
+  {
+    Metrics(const InverseOfferScheduler& scheduler)
+      : uptime_secs(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/uptime_secs",
+          defer(scheduler, &InverseOfferScheduler::_uptime_secs)),
+      subscribed(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/subscribed",
+          defer(scheduler, &InverseOfferScheduler::_subscribed)),
+      offers_received(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/offers_received"),
+      inverse_offers_received(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/inverse_offers_received"),
+      sleepers_killed(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/sleepers_killed"),
+      sleepers_lost_abnormally(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/sleepers_lost_abnormally"),
+      current_sleepers(
+          std::string(FRAMEWORK_METRICS_PREFIX) + "/current_sleepers",
+          defer(scheduler, &InverseOfferScheduler::_current_sleepers))
+    {
+      process::metrics::add(uptime_secs);
+      process::metrics::add(subscribed);
+      process::metrics::add(offers_received);
+      process::metrics::add(inverse_offers_received);
+      process::metrics::add(sleepers_killed);
+      process::metrics::add(sleepers_lost_abnormally);
+      process::metrics::add(current_sleepers);
+    }
+
+    ~Metrics()
+    {
+      process::metrics::remove(uptime_secs);
+      process::metrics::remove(subscribed);
+      process::metrics::remove(offers_received);
+      process::metrics::remove(inverse_offers_received);
+      process::metrics::remove(sleepers_killed);
+      process::metrics::remove(sleepers_lost_abnormally);
+      process::metrics::remove(current_sleepers);
+    }
+
+    process::metrics::PullGauge uptime_secs;
+    process::metrics::PullGauge subscribed;
+
+    process::metrics::Counter offers_received;
+    process::metrics::Counter inverse_offers_received;
+
+    // The only expected terminal state is TASK_KILLED.
+    // Other terminal states are considered incorrect.
+    process::metrics::Counter sleepers_killed;
+    process::metrics::Counter sleepers_lost_abnormally;
+
+    process::metrics::PullGauge current_sleepers;
+  } metrics;
+};
+
+
+class Flags : public virtual mesos::internal::examples::Flags
+{
+public:
+  Flags()
+  {
+    add(&Flags::num_tasks,
+        "num_tasks",
+        "Number of sleep tasks to run at once. Each task is started on\n"
+        "a separate machine. The scheduler will attempt to migrate tasks\n"
+        "to other machines ahead of planned maintenance.",
+        1,
+        [](int value) -> Option<Error> {
+          if (value <= 0) {
+            return Error("Expected --num_tasks greater than zero");
+          }
+
+          return None();
+        });
+  }
+
+  int num_tasks;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  Try<flags::Warnings> load = flags.load("MESOS_EXAMPLE_", argc, argv);
+
+  if (flags.help) {
+    std::cout << flags.usage() << std::endl;
+    return EXIT_SUCCESS;
+  }
+
+  if (load.isError()) {
+    std::cerr << flags.usage(load.error()) << std::endl;
+    return EXIT_FAILURE;
+  }
+
+  mesos::internal::logging::initialize(argv[0], true, flags); // Catch signals.
+
+  // Log any flag warnings.
+  foreach (const flags::Warning& warning, load->warnings) {
+    LOG(WARNING) << warning.message;
+  }
+
+  // Nothing special to say about this framework.
+  FrameworkInfo framework;
+  framework.set_user(os::user().get());
+  framework.set_name(FRAMEWORK_NAME);
+  framework.set_role(flags.role);
+  framework.set_checkpoint(flags.checkpoint);
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+
+  Option<Credential> credential = None();
+
+  if (flags.authenticate) {
+    LOG(INFO) << "Enabling authentication for the framework";
+
+    Credential credential_;
+    credential_.set_principal(flags.principal);
+    if (flags.secret.isSome()) {
+      credential_.set_secret(flags.secret.get());
+    }
+    credential = credential_;
+  }
+
+  if (flags.master == "local") {
+    // Configure master.
+    os::setenv("MESOS_ROLES", flags.role);
+
+    os::setenv(
+        "MESOS_AUTHENTICATE_HTTP_FRAMEWORKS",
+        stringify(flags.authenticate));
+
+    os::setenv("MESOS_HTTP_FRAMEWORK_AUTHENTICATORS", "basic");
+
+    mesos::ACLs acls;
+    mesos::ACL::RegisterFramework* acl = acls.add_register_frameworks();
+    acl->mutable_principals()->set_type(mesos::ACL::Entity::ANY);
+    acl->mutable_roles()->add_values(flags.role);
+    os::setenv("MESOS_ACLS", stringify(JSON::protobuf(acls)));
+  }
+
+  process::Owned<InverseOfferScheduler> scheduler(new InverseOfferScheduler(
+      framework,
+      flags.master,
+      flags.num_tasks,
+      credential));
+
+  process::spawn(scheduler.get());
+  process::wait(scheduler.get());
+
+  return EXIT_SUCCESS;
+}