You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by nn...@apache.org on 2014/01/25 07:41:50 UTC

git commit: Added launchTasks for list of offers

Updated Branches:
  refs/heads/master 1230910a5 -> b609c8514


Added launchTasks for list of offers

Running tasks on more than one offer belonging to a single slave can
be useful in situations with multiple out-standing offers.

This patch extends the usual launchTasks() to accept a vector of
OfferIDs. The previous launchTasks (accepting a single OfferID) has
been kept for backward compatibility, but this now calls the new
launchTasks() with a one-element list. This also applied for the JNI
and python interfaces, which accepts both formats as well.

Offers are verified to belong to the same slave and framework,
before resources are merged and used.

Review: https://reviews.apache.org/r/14669


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b609c851
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b609c851
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b609c851

Branch: refs/heads/master
Commit: b609c851493c81c6ba8dfe51cf102400c05c2d0c
Parents: 1230910
Author: Niklas Q. Nielsen <ni...@mesosphere.io>
Authored: Fri Jan 24 18:34:24 2014 -0800
Committer: Niklas Q. Nielsen <ni...@mesosphere.io>
Committed: Fri Jan 24 18:48:33 2014 -0800

----------------------------------------------------------------------
 include/mesos/scheduler.hpp                     |  23 +-
 .../org_apache_mesos_MesosSchedulerDriver.cpp   |  80 +-
 .../org/apache/mesos/MesosSchedulerDriver.java  |  17 +
 .../src/org/apache/mesos/SchedulerDriver.java   |  34 +-
 src/master/master.cpp                           | 830 +++++++++++--------
 src/master/master.hpp                           |  19 +-
 src/messages/messages.proto                     |   3 +-
 .../native/mesos_scheduler_driver_impl.cpp      |  35 +-
 src/sched/sched.cpp                             |  63 +-
 src/tests/master_tests.cpp                      | 299 +++++++
 src/tests/resource_offers_tests.cpp             |   4 +-
 11 files changed, 1018 insertions(+), 389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/include/mesos/scheduler.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/scheduler.hpp b/include/mesos/scheduler.hpp
index 8063997..2e4707e 100644
--- a/include/mesos/scheduler.hpp
+++ b/include/mesos/scheduler.hpp
@@ -242,11 +242,17 @@ public:
    * not used by the tasks or their executors) will be considered
    * declined. The specified filters are applied on all unused
    * resources (see mesos.proto for a description of Filters).
+   * Available resources are aggregated when mutiple offers are
+   * provided. Note that all offers must belong to the same slave.
    * Invoking this function with an empty collection of tasks declines
-   * this offer in its entirety (see Scheduler::declineOffer). Note
-   * that currently tasks can only be launched per offer. In the
-   * future, frameworks will be allowed to aggregate offers
-   * (resources) to launch their tasks.
+   * offers in their entirety (see Scheduler::declineOffer).
+   */
+  virtual Status launchTasks(const std::vector<OfferID>& offerIds,
+                             const std::vector<TaskInfo>& tasks,
+                             const Filters& filters = Filters()) = 0;
+
+  /**
+   * DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
    */
   virtual Status launchTasks(const OfferID& offerId,
                              const std::vector<TaskInfo>& tasks,
@@ -371,9 +377,18 @@ public:
   virtual Status join();
   virtual Status run();
   virtual Status requestResources(const std::vector<Request>& requests);
+
+  /**
+   * TODO(nnielsen): launchTasks using single offer is deprecated.
+   * Use launchTasks with offer list instead.
+   */
   virtual Status launchTasks(const OfferID& offerId,
                              const std::vector<TaskInfo>& tasks,
                              const Filters& filters = Filters());
+
+  virtual Status launchTasks(const std::vector<OfferID>& offerIds,
+                             const std::vector<TaskInfo>& tasks,
+                             const Filters& filters = Filters());
   virtual Status killTask(const TaskID& taskId);
   virtual Status declineOffer(const OfferID& offerId,
                               const Filters& filters = Filters());

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
index 5d6492d..d2369ac 100644
--- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
+++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp
@@ -706,10 +706,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_killTask
 /*
  * Class:     org_apache_mesos_MesosSchedulerDriver
  * Method:    launchTasks
- * Signature: (Lorg/apache/mesos/Protos/OfferID;Ljava/util/Collection;Lorg/apache/mesos/Protos/Filters;)Lorg/apache/mesos/Protos/Status;
+ * Signature: (Lorg/apache/mesos/Protos$OfferID;Ljava/util/Collection;Lorg/apache/mesos/Protos$Filters;)Lorg/apache/mesos/Protos/Status;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks
-  (JNIEnv* env, jobject thiz, jobject jofferId, jobject jtasks, jobject jfilters)
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks__Lorg_apache_mesos_Protos_00024OfferID_2Ljava_util_Collection_2Lorg_apache_mesos_Protos_00024Filters_2
+  (JNIEnv *env, jobject thiz, jobject jofferId, jobject jtasks, jobject jfilters)
 {
   // Construct a C++ OfferID from the Java OfferID.
   const OfferID& offerId = construct<OfferID>(env, jofferId);
@@ -748,7 +748,79 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks
   MesosSchedulerDriver* driver =
     (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
 
-  Status status = driver->launchTasks(offerId, tasks, filters);
+  vector<OfferID> offerIds;
+  offerIds.push_back(offerId);
+
+  Status status = driver->launchTasks(offerIds, tasks, filters);
+
+  return convert<Status>(env, status);
+}
+
+
+/*
+ * Class:     org_apache_mesos_MesosSchedulerDriver
+ * Method:    launchTasks
+ * Signature: (Ljava/util/Collection;Ljava/util/Collection;Lorg/apache/mesos/Protos$Filters;)Lorg/apache/mesos/Protos/Status;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks__Ljava_util_Collection_2Ljava_util_Collection_2Lorg_apache_mesos_Protos_00024Filters_2
+  (JNIEnv* env, jobject thiz, jobject jofferIds, jobject jtasks, jobject jfilters)
+{
+  // Construct a C++ OfferID from each Java OfferID.
+  vector<OfferID> offers;
+  jclass clazz = env->GetObjectClass(jofferIds);
+
+  // Iterator iterator = tasks.iterator();
+  jmethodID iterator =
+    env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+  jobject jiterator = env->CallObjectMethod(jofferIds, iterator);
+
+  clazz = env->GetObjectClass(jiterator);
+
+  // while (iterator.hasNext()) {
+  jmethodID hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+  jmethodID next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+  while (env->CallBooleanMethod(jiterator, hasNext)) {
+    // Object task = iterator.next();
+    jobject jofferId = env->CallObjectMethod(jiterator, next);
+    const OfferID& offerId = construct<OfferID>(env, jofferId);
+    offers.push_back(offerId);
+  }
+
+  // Construct a C++ TaskInfo from each Java TaskInfo.
+  vector<TaskInfo> tasks;
+  clazz = env->GetObjectClass(jtasks);
+
+  // Iterator iterator = tasks.iterator();
+  iterator = env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;");
+  jiterator = env->CallObjectMethod(jtasks, iterator);
+
+  clazz = env->GetObjectClass(jiterator);
+
+  // while (iterator.hasNext()) {
+  hasNext = env->GetMethodID(clazz, "hasNext", "()Z");
+
+  next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;");
+
+  while (env->CallBooleanMethod(jiterator, hasNext)) {
+    // Object task = iterator.next();
+    jobject jtask = env->CallObjectMethod(jiterator, next);
+    const TaskInfo& task = construct<TaskInfo>(env, jtask);
+    tasks.push_back(task);
+  }
+
+  // Construct a C++ Filters from the Java Filters.
+  const Filters& filters = construct<Filters>(env, jfilters);
+
+  // Now invoke the underlying driver.
+  clazz = env->GetObjectClass(thiz);
+
+  jfieldID __driver = env->GetFieldID(clazz, "__driver", "J");
+  MesosSchedulerDriver* driver =
+    (MesosSchedulerDriver*) env->GetLongField(thiz, __driver);
+
+  Status status = driver->launchTasks(offers, tasks, filters);
 
   return convert<Status>(env, status);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
index ed4b4a3..3fd1eb5 100644
--- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java
@@ -20,8 +20,10 @@ package org.apache.mesos;
 
 import org.apache.mesos.Protos.*;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 
@@ -146,15 +148,30 @@ public class MesosSchedulerDriver implements SchedulerDriver {
 
   public native Status requestResources(Collection<Request> requests);
 
+  /**
+   * @deprecated Replaced by launchTasks using offer list.
+   */
   public Status launchTasks(OfferID offerId,
                             Collection<TaskInfo> tasks) {
     return launchTasks(offerId, tasks, Filters.newBuilder().build());
   }
 
+  /**
+   * @deprecated Replaced by launchTasks using offer list.
+   */
   public native Status launchTasks(OfferID offerId,
                                    Collection<TaskInfo> tasks,
                                    Filters filters);
 
+  public Status launchTasks(Collection<OfferID> offerIds,
+                            Collection<TaskInfo> tasks) {
+    return launchTasks(offerIds, tasks, Filters.newBuilder().build());
+  }
+
+  public native Status launchTasks(Collection<OfferID> offerIds,
+                                   Collection<TaskInfo> tasks,
+                                   Filters filters);
+
   public native Status killTask(TaskID taskId);
 
   public Status declineOffer(OfferID offerId) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/java/src/org/apache/mesos/SchedulerDriver.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java
index 5b0ca39..6bca879 100644
--- a/src/java/src/org/apache/mesos/SchedulerDriver.java
+++ b/src/java/src/org/apache/mesos/SchedulerDriver.java
@@ -86,23 +86,39 @@ public interface SchedulerDriver {
   Status requestResources(Collection<Request> requests);
 
   /**
-   * Launches the given set of tasks. Any resources remaining (i.e.,
-   * not used by the tasks or their executors) will be considered
-   * declined. The specified filters are applied on all unused
-   * resources (see mesos.proto for a description of Filters).
+   * Launches the given set of tasks on a set of offers. Resources
+   * from offers are aggregated when more then one is provided.
+   * Note that all offers must belong to same slave. Any resources
+   * remaining (i.e., not used by the tasks or their executors) will
+   * be considered declined. The specified filters are applied on all
+   * unused resources (see mesos.proto for a description of Filters).
    * Invoking this function with an empty collection of tasks declines
-   * this offer in its entirety (see {@link #declineOffer}. Note that
-   * currently tasks can only be launched per offer. In the future,
-   * frameworks will be allowed to aggregate offers (resources) to
-   * launch their tasks.
+   * offers in their entirety (see {@link #declineOffer}.
    */
-  Status launchTasks(OfferID offerId,
+  Status launchTasks(Collection<OfferID> offerIds,
                      Collection<TaskInfo> tasks,
                      Filters filters);
 
   /**
    * Launches the given set of tasks. See above for details.
    */
+  Status launchTasks(Collection<OfferID> offerIds, Collection<TaskInfo> tasks);
+
+  /**
+   * @deprecated Use launchTasks(
+   *                     Collection<OfferID> offerId,
+   *                     Collection<TaskInfo> tasks,
+   *                     Filters filters) instead.
+   */
+  Status launchTasks(OfferID offerId,
+                     Collection<TaskInfo> tasks,
+                     Filters filters);
+
+  /**
+   * @deprecated Use launchTasks(
+   *                     Collection<OfferID> offerId,
+   *                     Collection<TaskInfo> tasks) instead.
+   */
   Status launchTasks(OfferID offerId, Collection<TaskInfo> tasks);
 
   /**

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index c7d9186..77872ec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -452,7 +452,8 @@ void Master::initialize()
       &LaunchTasksMessage::framework_id,
       &LaunchTasksMessage::offer_id,
       &LaunchTasksMessage::tasks,
-      &LaunchTasksMessage::filters);
+      &LaunchTasksMessage::filters,
+      &LaunchTasksMessage::offer_ids);
 
   install<ReviveOffersMessage>(
       &Master::reviveOffers,
@@ -1088,18 +1089,336 @@ void Master::resourceRequest(
 }
 
 
+// We use the visitor pattern to abstract the process of performing
+// any validations, aggregations, etc. of tasks that a framework
+// attempts to run within the resources provided by offers. A
+// visitor can return an optional error (typedef'ed as an option of a
+// string) which will cause the master to send a failed status update
+// back to the framework for only that task description. An instance
+// will be reused for each task description from same 'launchTasks()',
+// but not for task descriptions from different offers.
+typedef Option<string> TaskInfoError;
+
+struct TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave) = 0;
+
+  virtual ~TaskInfoVisitor() {}
+};
+
+
+// Checks that the slave ID used by a task is correct.
+struct SlaveIDChecker : TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave)
+  {
+    if (!(task.slave_id() == slave.id)) {
+      return "Task uses invalid slave " + task.slave_id().value() +
+          " while slave " + slave.id.value() + " is expected";
+    }
+
+    return None();
+  }
+};
+
+
+// Checks that each task uses a unique ID. Regardless of whether a
+// task actually gets launched (for example, another checker may
+// return an error for a task), we always consider it an error when a
+// task tries to re-use an ID.
+struct UniqueTaskIDChecker : TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave)
+  {
+    const TaskID& taskId = task.task_id();
+
+    if (ids.contains(taskId) || framework.tasks.contains(taskId)) {
+      return "Task has duplicate ID: " + taskId.value();
+    }
+
+    ids.insert(taskId);
+
+    return None();
+  }
+
+  hashset<TaskID> ids;
+};
+
+
+// Checks that the used resources by a task (and executor if
+// necessary) on each slave does not exceed the total resources
+// offered on that slave
+struct ResourceUsageChecker : TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave)
+  {
+    if (task.resources().size() == 0) {
+      return stringify("Task uses no resources");
+    }
+
+    foreach (const Resource& resource, task.resources()) {
+      if (!Resources::isAllocatable(resource)) {
+        return "Task uses invalid resources: " + stringify(resource);
+      }
+    }
+
+    // Check if this task uses more resources than offered.
+    Resources taskResources = task.resources();
+
+    if (!((usedResources + taskResources) <= resources)) {
+      return "Task " + stringify(task.task_id()) + " attempted to use " +
+          stringify(taskResources) + " combined with already used " +
+          stringify(usedResources) + " is greater than offered " +
+          stringify(resources);
+    }
+
+    // Check this task's executor's resources.
+    if (task.has_executor()) {
+      // TODO(benh): Check that the executor uses some resources.
+
+      foreach (const Resource& resource, task.executor().resources()) {
+        if (!Resources::isAllocatable(resource)) {
+          // TODO(benh): Send back the invalid resources?
+          return "Executor for task " + stringify(task.task_id()) +
+              " uses invalid resources " + stringify(resource);
+        }
+      }
+
+      // Check if this task's executor is running, and if not check if
+      // the task + the executor use more resources than offered.
+      if (!executors.contains(task.executor().executor_id())) {
+        if (!slave.hasExecutor(framework.id, task.executor().executor_id())) {
+          taskResources += task.executor().resources();
+          if (!((usedResources + taskResources) <= resources)) {
+            return "Task " + stringify(task.task_id()) + " + executor attempted" +
+                " to use " + stringify(taskResources) + " combined with" +
+                " already used " + stringify(usedResources) + " is greater" +
+                " than offered " + stringify(resources);
+          }
+        }
+        executors.insert(task.executor().executor_id());
+      }
+    }
+
+    usedResources += taskResources;
+
+    return None();
+  }
+
+  Resources usedResources;
+  hashset<ExecutorID> executors;
+};
+
+
+// Checks that tasks that use the "same" executor (i.e., same
+// ExecutorID) have an identical ExecutorInfo.
+struct ExecutorInfoChecker : TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave)
+  {
+    if (task.has_executor() == task.has_command()) {
+      return stringify(
+          "Task should have at least one (but not both) of CommandInfo or"
+          " ExecutorInfo present");
+    }
+
+    if (task.has_executor()) {
+      if (slave.hasExecutor(framework.id, task.executor().executor_id())) {
+        const Option<ExecutorInfo> executorInfo =
+          slave.executors.get(framework.id).get().get(task.executor().executor_id());
+
+        if (!(task.executor() == executorInfo.get())) {
+          return "Task has invalid ExecutorInfo (existing ExecutorInfo"
+              " with same ExecutorID is not compatible).\n"
+              "------------------------------------------------------------\n"
+              "Existing ExecutorInfo:\n" +
+              stringify(executorInfo.get()) + "\n"
+              "------------------------------------------------------------\n"
+              "Task's ExecutorInfo:\n" +
+              stringify(task.executor()) + "\n"
+              "------------------------------------------------------------\n";
+        }
+      }
+    }
+
+    return None();
+  }
+};
+
+
+// Checks that a task that asks for checkpointing is not being
+// launched on a slave that has not enabled checkpointing.
+struct CheckpointChecker : TaskInfoVisitor
+{
+  virtual TaskInfoError operator () (
+      const TaskInfo& task,
+      const Resources& resources,
+      const Framework& framework,
+      const Slave& slave)
+  {
+    if (framework.info.checkpoint() && !slave.info.checkpoint()) {
+      return "Task asked to be checkpointed but slave " +
+          stringify(slave.id) + " has checkpointing disabled";
+    }
+    return None();
+  }
+};
+
+
+// OfferVisitors are similar to the TaskInfoVisitor pattern and
+// are used for validation and aggregation of offers.
+// The error reporting scheme is also similar to TaskInfoVisitor.
+// However, offer processing (and subsequent task processing) is
+// aborted altogether if offer visitor reports an error.
+typedef Option<string> OfferError;
+
+struct OfferVisitor
+{
+  virtual OfferError operator () (
+      const OfferID& offerId,
+      const Framework& framework,
+      Master* master) = 0;
+
+  virtual ~OfferVisitor() {}
+
+  Slave* getSlave(Master* master, const SlaveID& id) {
+    return master->getSlave(id);
+  }
+
+  Offer* getOffer(Master* master, const OfferID& id) {
+    return master->getOffer(id);
+  }
+};
+
+
+// Checks validity/liveness of an offer.
+struct ValidOfferChecker : OfferVisitor {
+  virtual OfferError operator () (
+      const OfferID& offerId,
+      const Framework& framework,
+      Master* master)
+  {
+    Offer* offer = getOffer(master, offerId);
+    if (offer == NULL) {
+      return "Offer " + stringify(offerId) + " is no longer valid";
+    }
+
+    return None();
+  }
+};
+
+
+// Checks that an offer belongs to the expected framework.
+struct FrameworkChecker : OfferVisitor {
+  virtual OfferError operator () (
+      const OfferID& offerId,
+      const Framework& framework,
+      Master* master)
+  {
+    Offer* offer = getOffer(master, offerId);
+    if (!(framework.id == offer->framework_id())) {
+      return "Offer " + stringify(offer->id()) +
+          " has invalid framework " + stringify(offer->framework_id()) +
+          " while framework " + stringify(framework.id) + " is expected";
+    }
+
+    return None();
+  }
+};
+
+
+// Checks that the slave is valid and ensures that all offers belong to
+// the same slave.
+struct SlaveChecker : OfferVisitor
+{
+  virtual OfferError operator () (
+      const OfferID& offerId,
+      const Framework& framework,
+      Master* master)
+  {
+    Offer* offer = getOffer(master, offerId);
+    Slave* slave = getSlave(master, offer->slave_id());
+    if (slave == NULL) {
+      return "Offer " + stringify(offerId) +
+          " outlived slave " + stringify(offer->slave_id());
+    }
+
+    CHECK(!slave->disconnected)
+      << "Offer " + stringify(offerId)
+      << " outlived disconnected slave " << stringify(slave->id);
+
+    if (slaveId.isNone()) {
+      // Set slave id and use as base case for validation.
+      slaveId = slave->id;
+    } else if (!(slave->id == slaveId.get())) {
+      return "Aggregated offers must belong to one single slave. Offer " +
+          stringify(offerId) + " uses slave " +
+          stringify(slave->id) + " and slave " +
+          stringify(slaveId.get());
+    }
+
+    return None();
+  }
+
+  Option<const SlaveID> slaveId;
+};
+
+
+// Checks that an offer only appears once in offer list.
+struct UniqueOfferIDChecker : OfferVisitor
+{
+  virtual OfferError operator () (
+      const OfferID& offerId,
+      const Framework& framework,
+      Master* master)
+  {
+    if (offers.contains(offerId)) {
+      return "Duplicate offer " + stringify(offerId) + " in offer list";
+    }
+    offers.insert(offerId);
+
+    return None();
+  }
+
+  hashset<OfferID> offers;
+};
+
+
 void Master::launchTasks(
     const UPID& from,
     const FrameworkID& frameworkId,
     const OfferID& offerId,
     const vector<TaskInfo>& tasks,
-    const Filters& filters)
+    const Filters& filters,
+    const vector<OfferID>& _offerIds)
 {
   Framework* framework = getFramework(frameworkId);
 
   if (framework == NULL) {
     LOG(WARNING)
-      << "Ignoring launch tasks message for offer " << offerId
+      << "Ignoring launch tasks message for offer "
+      << stringify(_offerIds.empty() ? stringify(offerId)
+                                     : stringify(_offerIds))
       << " of framework " << frameworkId
       << " because the framework cannot be found";
     return;
@@ -1107,57 +1426,196 @@ void Master::launchTasks(
 
   if (from != framework->pid) {
     LOG(WARNING)
-      << "Ignoring launch tasks message for offer " << offerId
+      << "Ignoring launch tasks message for offer "
+      << stringify(_offerIds.empty() ? stringify(offerId)
+                                     : stringify(_offerIds))
       << " of framework " << frameworkId << " from '" << from
       << "' because it is not from the registered framework '"
       << framework->pid << "'";
     return;
   }
 
-  // TODO(benh): Support offer "hoarding" and allow multiple offers
-  // *from the same slave* to be used to launch tasks. This can be
-  // accomplished rather easily by collecting and merging all offers
-  // into a mega-offer and passing that offer to
-  // Master::processTasks.
-  Offer* offer = getOffer(offerId);
-  if (offer != NULL) {
-    CHECK_EQ(offer->framework_id(), frameworkId)
-        << "Offer " << offerId
-        << " has invalid frameworkId " << offer->framework_id();
+  // Support single offerId for backward compatibility.
+  // OfferIds will be ignored if offerId is set.
+  vector<OfferID> offerIds;
+  if (offerId.has_value()) {
+    offerIds.push_back(offerId);
+  } else if (_offerIds.size() > 0) {
+    offerIds = _offerIds;
+  } else {
+    LOG(WARNING) << "No offers to launch tasks on";
 
-    Slave* slave = getSlave(offer->slave_id());
-    CHECK(slave != NULL)
-      << "Offer " << offerId << " outlived  slave "
-      << slave->id << " (" << slave->info.hostname() << ")";
+    foreach (const TaskInfo& task, tasks) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          "Task launched without offers");
 
-    // If a slave is disconnected we should've removed its offers.
-    CHECK(!slave->disconnected)
-      << "Offer " << offerId << " outlived disconnected slave "
-      << slave->id << " (" << slave->info.hostname() << ")";
+      LOG(INFO) << "Sending status update " << update
+                << " for launch task attempt without offers";
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+    return;
+  }
+
+  // Common slave id for task validation.
+  Option<SlaveID> slaveId;
+
+  // Create offer visitors.
+  list<OfferVisitor*> offerVisitors;
+  offerVisitors.push_back(new ValidOfferChecker());
+  offerVisitors.push_back(new FrameworkChecker());
+  offerVisitors.push_back(new SlaveChecker());
+  offerVisitors.push_back(new UniqueOfferIDChecker());
+
+  // Verify and aggregate all offers.
+  // Abort offer and task processing if any offer validation failed.
+  Resources totalResources;
+  OfferError offerError = None();
+  foreach (const OfferID& offerId, offerIds) {
+    foreach (OfferVisitor* visitor, offerVisitors) {
+      offerError = (*visitor)(offerId, *framework, this);
+      if (offerError.isSome()) {
+        break;
+      }
+    }
+    // Offer validation error needs to be propagated from visitor
+    // loop above.
+    if (offerError.isSome()) {
+      break;
+    }
+
+    // If offer validation succeeds, we need to pass along the common
+    // slave. So optimisticaly, we store the first slave id we see.
+    // In case of invalid offers (different slaves for example), we
+    // report error and return from launchTask before slaveId is used.
+    if (slaveId.isNone()) {
+      slaveId = getOffer(offerId)->slave_id();
+    }
+
+    totalResources += getOffer(offerId)->resources();
+  }
+
+  // Cleanup visitors.
+  while (!offerVisitors.empty()) {
+    OfferVisitor* visitor = offerVisitors.front();
+    offerVisitors.pop_front();
+    delete visitor;
+  };
+
+  // Remove offers.
+  foreach (const OfferID& offerId, offerIds) {
+    Offer* offer = getOffer(offerId);
+    // Explicit check needed if an offerId appears more
+    // than once in offerIds.
+     if (offer != NULL) {
+      removeOffer(offer);
+    }
+  }
+
+  if (offerError.isSome()) {
+    LOG(WARNING) << "Failed to validate offer " << offerId
+                   << " : " << offerError.get();
 
-    processTasks(offer, framework, slave, tasks, filters);
-  } else {
-    // The offer is gone (possibly rescinded, lost slave, re-reply
-    // to same offer, etc). Report all tasks in it as failed.
-    // TODO: Consider adding a new task state TASK_INVALID for
-    // situations like these.
-    LOG(WARNING) << "Offer " << offerId << " is no longer valid";
     foreach (const TaskInfo& task, tasks) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          task.slave_id(),
+          task.task_id(),
+          TASK_LOST,
+          "Task launched with invalid offers: " + offerError.get());
+
+      LOG(INFO) << "Sending status update " << update
+                << " for launch task attempt on invalid offers: "
+                << stringify(offerIds);
+      StatusUpdateMessage message;
+      message.mutable_update()->CopyFrom(update);
+      send(framework->pid, message);
+    }
+
+    return;
+  }
+
+  CHECK(slaveId.isSome()) << "Slave id not found";
+  Slave* slave = CHECK_NOTNULL(getSlave(slaveId.get()));
+
+  LOG(INFO) << "Processing reply for offers: "
+            << stringify(offerIds)
+            << " on slave " << slave->id
+            << " (" << slave->info.hostname() << ")"
+            << " for framework " << framework->id;
+
+  Resources usedResources; // Accumulated resources used.
+
+  // Create task visitors.
+  list<TaskInfoVisitor*> taskVisitors;
+  taskVisitors.push_back(new SlaveIDChecker());
+  taskVisitors.push_back(new UniqueTaskIDChecker());
+  taskVisitors.push_back(new ResourceUsageChecker());
+  taskVisitors.push_back(new ExecutorInfoChecker());
+  taskVisitors.push_back(new CheckpointChecker());
+
+  // Loop through each task and check it's validity.
+  foreach (const TaskInfo& task, tasks) {
+    // Possible error found while checking task's validity.
+    TaskInfoError error = None();
+
+    // Invoke each visitor.
+    foreach (TaskInfoVisitor* visitor, taskVisitors) {
+      error = (*visitor)(task, totalResources, *framework, *slave);
+      if (error.isSome()) {
+        break;
+      }
+    }
+
+    if (error.isNone()) {
+      // Task looks good, get it running!
+      usedResources += launchTask(task, framework, slave);
+    } else {
+      // Error validating task, send a failed status update.
+      LOG(WARNING) << "Failed to validate task " << task.task_id()
+                   << " : " << error.get();
+
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          framework->id,
+          slave->id,
+          task.task_id(),
+          TASK_LOST,
+          error.get());
+
+      LOG(INFO) << "Sending status update "
+                << update << " for invalid task";
       StatusUpdateMessage message;
-      StatusUpdate* update = message.mutable_update();
-      update->mutable_framework_id()->MergeFrom(frameworkId);
-      TaskStatus* status = update->mutable_status();
-      status->mutable_task_id()->MergeFrom(task.task_id());
-      status->set_state(TASK_LOST);
-      status->set_message("Task launched with invalid offer");
-      update->set_timestamp(Clock::now().secs());
-      update->set_uuid(UUID::random().toBytes());
-
-      LOG(INFO) << "Sending status update " << *update
-                << " for launch task attempt on invalid offer " << offerId;
+      message.mutable_update()->CopyFrom(update);
       send(framework->pid, message);
     }
   }
+
+  // All used resources should be allocatable, enforced by our validators.
+  CHECK_EQ(usedResources, usedResources.allocatable());
+
+  // Calculate unused resources.
+  Resources unusedResources = totalResources - usedResources;
+
+  if (unusedResources.allocatable().size() > 0) {
+    // Tell the allocator about the unused (e.g., refused) resources.
+    allocator->resourcesUnused(
+        framework->id,
+        slave->id,
+        unusedResources,
+        filters);
+  }
+
+  // Cleanup visitors.
+  while (!taskVisitors.empty()) {
+    TaskInfoVisitor* visitor = taskVisitors.front();
+    taskVisitors.pop_front();
+    delete visitor;
+  };
 }
 
 
@@ -1909,298 +2367,6 @@ vector<Framework*> Master::getActiveFrameworks() const
 }
 
 
-// We use the visitor pattern to abstract the process of performing
-// any validations, aggregations, etc. of tasks that a framework
-// attempts to run within the resources provided by an offer. A
-// visitor can return an optional error (typedef'ed as an option of a
-// string) which will cause the master to send a failed status update
-// back to the framework for only that task description. An instance
-// will be reused for each task description from same offer, but not
-// for task descriptions from different offers.
-typedef Option<string> TaskInfoError;
-
-struct TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave) = 0;
-
-  virtual ~TaskInfoVisitor() {}
-};
-
-
-// Checks that the slave ID used by a task is correct.
-struct SlaveIDChecker : TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave)
-  {
-    if (!(task.slave_id() == slave->id)) {
-      return TaskInfoError::some(
-          "Task uses invalid slave: " + task.slave_id().value());
-    }
-
-    return TaskInfoError::none();
-  }
-};
-
-
-// Checks that each task uses a unique ID. Regardless of whether a
-// task actually gets launched (for example, another checker may
-// return an error for a task), we always consider it an error when a
-// task tries to re-use an ID.
-struct UniqueTaskIDChecker : TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave)
-  {
-    const TaskID& taskId = task.task_id();
-
-    if (ids.contains(taskId) || framework->tasks.contains(taskId)) {
-      return TaskInfoError::some(
-          "Task has duplicate ID: " + taskId.value());
-    }
-
-    ids.insert(taskId);
-
-    return TaskInfoError::none();
-  }
-
-  hashset<TaskID> ids;
-};
-
-
-// Checks that the used resources by a task (and executor if
-// necessary) on each slave does not exceed the total resources
-// offered on that slave
-struct ResourceUsageChecker : TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave)
-  {
-    if (task.resources().size() == 0) {
-      return TaskInfoError::some("Task uses no resources");
-    }
-
-    foreach (const Resource& resource, task.resources()) {
-      if (!Resources::isAllocatable(resource)) {
-        // TODO(benh): Send back the invalid resources?
-        return TaskInfoError::some("Task uses invalid resources");
-      }
-    }
-
-    // Check if this task uses more resources than offered.
-    Resources taskResources = task.resources();
-
-    if (!((usedResources + taskResources) <= offer->resources())) {
-      return TaskInfoError::some(
-          "Task " + stringify(task.task_id()) + " attempted to use " +
-          stringify(taskResources) + " combined with already used " +
-          stringify(usedResources) + " is greater than offered " +
-          stringify(offer->resources()));
-    }
-
-    // Check this task's executor's resources.
-    if (task.has_executor()) {
-      // TODO(benh): Check that the executor uses some resources.
-
-      foreach (const Resource& resource, task.executor().resources()) {
-        if (!Resources::isAllocatable(resource)) {
-          // TODO(benh): Send back the invalid resources?
-          return TaskInfoError::some(
-              "Executor for task " + stringify(task.task_id()) +
-              " uses invalid resources " + stringify(resource));
-        }
-      }
-
-      // Check if this task's executor is running, and if not check if
-      // the task + the executor use more resources than offered.
-      if (!executors.contains(task.executor().executor_id())) {
-        if (!slave->hasExecutor(framework->id, task.executor().executor_id())) {
-          taskResources += task.executor().resources();
-          if (!((usedResources + taskResources) <= offer->resources())) {
-            return TaskInfoError::some(
-                "Task " + stringify(task.task_id()) + " + executor attempted" +
-                " to use " + stringify(taskResources) + " combined with" +
-                " already used " + stringify(usedResources) + " is greater" +
-                " than offered " + stringify(offer->resources()));
-          }
-        }
-        executors.insert(task.executor().executor_id());
-      }
-    }
-
-    usedResources += taskResources;
-
-    return TaskInfoError::none();
-  }
-
-  Resources usedResources;
-  hashset<ExecutorID> executors;
-};
-
-
-// Checks that tasks that use the "same" executor (i.e., same
-// ExecutorID) have an identical ExecutorInfo.
-struct ExecutorInfoChecker : TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave)
-  {
-    if (task.has_executor() == task.has_command()) {
-      return TaskInfoError::some(
-          "Task should have at least one (but not both) of CommandInfo or"
-          " ExecutorInfo present");
-    }
-
-    if (task.has_executor()) {
-      if (slave->hasExecutor(framework->id, task.executor().executor_id())) {
-        const ExecutorInfo& executorInfo =
-          slave->executors[framework->id][task.executor().executor_id()];
-        if (!(task.executor() == executorInfo)) {
-          return TaskInfoError::some(
-              "Task has invalid ExecutorInfo (existing ExecutorInfo"
-              " with same ExecutorID is not compatible).\n"
-              "------------------------------------------------------------\n"
-              "Existing ExecutorInfo:\n" +
-              stringify(executorInfo) + "\n"
-              "------------------------------------------------------------\n"
-              "Task's ExecutorInfo:\n" +
-              stringify(task.executor()) + "\n"
-              "------------------------------------------------------------\n");
-        }
-      }
-    }
-
-    return TaskInfoError::none();
-  }
-};
-
-
-// Checks that a task that asks for checkpointing is not being
-// launched on a slave that has not enabled checkpointing.
-// TODO(vinod): Consider not offering resources for non-checkpointing
-// slaves to frameworks that need checkpointing.
-struct CheckpointChecker : TaskInfoVisitor
-{
-  virtual TaskInfoError operator () (
-      const TaskInfo& task,
-      Offer* offer,
-      Framework* framework,
-      Slave* slave)
-  {
-    if (framework->info.checkpoint() && !slave->info.checkpoint()) {
-      return TaskInfoError::some(
-          "Task asked to be checkpointed but the slave "
-          "has checkpointing disabled");
-    }
-    return TaskInfoError::none();
-  }
-};
-
-
-// Process a resource offer reply (for a non-cancelled offer) by
-// launching the desired tasks (if the offer contains a valid set of
-// tasks) and reporting used resources to the allocator.
-void Master::processTasks(Offer* offer,
-                          Framework* framework,
-                          Slave* slave,
-                          const vector<TaskInfo>& tasks,
-                          const Filters& filters)
-{
-  CHECK_NOTNULL(offer);
-  CHECK_NOTNULL(framework);
-  CHECK_NOTNULL(slave);
-
-  LOG(INFO) << "Processing reply for offer " << offer->id()
-            << " on slave " << slave->id
-            << " (" << slave->info.hostname() << ")"
-            << " for framework " << framework->id;
-
-  Resources usedResources; // Accumulated resources used from this offer.
-
-  // Create task visitors.
-  list<TaskInfoVisitor*> visitors;
-  visitors.push_back(new SlaveIDChecker());
-  visitors.push_back(new UniqueTaskIDChecker());
-  visitors.push_back(new ResourceUsageChecker());
-  visitors.push_back(new ExecutorInfoChecker());
-  visitors.push_back(new CheckpointChecker());
-
-  // Loop through each task and check it's validity.
-  foreach (const TaskInfo& task, tasks) {
-    // Possible error found while checking task's validity.
-    TaskInfoError error = TaskInfoError::none();
-
-    // Invoke each visitor.
-    foreach (TaskInfoVisitor* visitor, visitors) {
-      error = (*visitor)(task, offer, framework, slave);
-      if (error.isSome()) {
-        break;
-      }
-    }
-
-    if (error.isNone()) {
-      // Task looks good, get it running!
-      usedResources += launchTask(task, framework, slave);
-    } else {
-      // Error validating task, send a failed status update.
-      LOG(WARNING) << "Failed to validate task " << task.task_id()
-                   << " : " << error.get();
-
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          framework->id,
-          slave->id,
-          task.task_id(),
-          TASK_LOST,
-          error.get());
-
-      LOG(INFO) << "Sending status update " << update << " for invalid task";
-      StatusUpdateMessage message;
-      message.mutable_update()->CopyFrom(update);
-      send(framework->pid, message);
-    }
-  }
-
-  // Cleanup visitors.
-  do {
-    TaskInfoVisitor* visitor = visitors.front();
-    visitors.pop_front();
-    delete visitor;
-  } while (!visitors.empty());
-
-  // All used resources should be allocatable, enforced by our validators.
-  CHECK_EQ(usedResources, usedResources.allocatable());
-
-  // Calculate unused resources.
-  Resources unusedResources = offer->resources() - usedResources;
-
-  if (unusedResources.allocatable().size() > 0) {
-    // Tell the allocator about the unused (e.g., refused) resources.
-    allocator->resourcesUnused(offer->framework_id(),
-                               offer->slave_id(),
-                               unusedResources,
-                               filters);
-  }
-
-  removeOffer(offer);
-}
-
-
 Resources Master::launchTask(const TaskInfo& task,
                              Framework* framework,
                              Slave* slave)

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 99b8181..7649737 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -78,6 +78,7 @@ class WhitelistWatcher;
 struct Framework;
 struct Slave;
 struct Role;
+struct OfferVisitor;
 
 
 class Master : public ProtobufProcess<Master>
@@ -116,7 +117,8 @@ public:
       const FrameworkID& frameworkId,
       const OfferID& offerId,
       const std::vector<TaskInfo>& tasks,
-      const Filters& filters);
+      const Filters& filters,
+      const std::vector<OfferID>& offerIds);
   void reviveOffers(
       const process::UPID& from,
       const FrameworkID& frameworkId);
@@ -205,16 +207,6 @@ protected:
   // Invoked when the contender has entered the contest.
   void contended(const Future<Future<Nothing> >& candidacy);
 
-  // Process a launch tasks request (for a non-cancelled offer) by
-  // launching the desired tasks (if the offer contains a valid set of
-  // tasks) and reporting any unused resources to the allocator.
-  void processTasks(
-      Offer* offer,
-      Framework* framework,
-      Slave* slave,
-      const std::vector<TaskInfo>& tasks,
-      const Filters& filters);
-
   // Reconciles a re-registering slave's tasks / executors and sends
   // TASK_LOST updates for tasks known to the master but unknown to
   // the slave.
@@ -313,6 +305,7 @@ private:
 
   friend struct SlaveRegistrar;
   friend struct SlaveReregistrar;
+  friend struct OfferVisitor;
 
   const Flags flags;
 
@@ -452,10 +445,10 @@ struct Slave
   }
 
   bool hasExecutor(const FrameworkID& frameworkId,
-		   const ExecutorID& executorId)
+		   const ExecutorID& executorId) const
   {
     return executors.contains(frameworkId) &&
-      executors[frameworkId].contains(executorId);
+      executors.get(frameworkId).get().contains(executorId);
   }
 
   void addExecutor(const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 1f264d5..922a8c4 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -155,9 +155,10 @@ message ResourceOffersMessage {
 
 message LaunchTasksMessage {
   required FrameworkID framework_id = 1;
-  required OfferID offer_id = 2;
+  optional OfferID offer_id = 2; // Deprecated.
   repeated TaskInfo tasks = 3;
   required Filters filters = 5;
+  repeated OfferID offer_ids = 6;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/python/native/mesos_scheduler_driver_impl.cpp
----------------------------------------------------------------------
diff --git a/src/python/native/mesos_scheduler_driver_impl.cpp b/src/python/native/mesos_scheduler_driver_impl.cpp
index 059ed5d..ff5c749 100644
--- a/src/python/native/mesos_scheduler_driver_impl.cpp
+++ b/src/python/native/mesos_scheduler_driver_impl.cpp
@@ -387,20 +387,41 @@ PyObject* MesosSchedulerDriverImpl_launchTasks(MesosSchedulerDriverImpl* self,
     return NULL;
   }
 
-  PyObject* offerIdObj = NULL;
+  PyObject* offerIdsObj = NULL;
   PyObject* tasksObj = NULL;
   PyObject* filtersObj = NULL;
-  OfferID offerId;
+  vector<OfferID> offerIds;
   vector<TaskInfo> tasks;
   Filters filters;
 
-  if (!PyArg_ParseTuple(args, "OO|O", &offerIdObj, &tasksObj, &filtersObj)) {
+  if (!PyArg_ParseTuple(args, "OO|O", &offerIdsObj, &tasksObj, &filtersObj)) {
     return NULL;
   }
 
-  if (!readPythonProtobuf(offerIdObj, &offerId)) {
-    PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID");
-    return NULL;
+  // Offer argument can be a list of offer ids or a single offer id (for
+  // backward compatibility).
+  if (!PyList_Check(offerIdsObj)) {
+    OfferID offerId;
+    if (!readPythonProtobuf(offerIdsObj, &offerId)) {
+      PyErr_Format(PyExc_Exception, "Could not deserialize Python OfferID");
+      return NULL;
+    }
+    offerIds.push_back(offerId);
+  } else {
+    Py_ssize_t len = PyList_Size(offerIdsObj);
+    for (int i = 0; i < len; i++) {
+      PyObject* offerObj = PyList_GetItem(offerIdsObj, i);
+      if (offerObj == NULL) {
+        return NULL;
+      }
+      OfferID offerId;
+      if (!readPythonProtobuf(offerObj, &offerId)) {
+        PyErr_Format(PyExc_Exception,
+                     "Could not deserialize Python OfferID");
+        return NULL;
+      }
+      offerIds.push_back(offerId);
+    }
   }
 
   if (!PyList_Check(tasksObj)) {
@@ -430,7 +451,7 @@ PyObject* MesosSchedulerDriverImpl_launchTasks(MesosSchedulerDriverImpl* self,
     }
   }
 
-  Status status = self->driver->launchTasks(offerId, tasks, filters);
+  Status status = self->driver->launchTasks(offerIds, tasks, filters);
   return PyInt_FromLong(status); // Sets exception if creating long fails.
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index f9028e8..77588c3 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -789,7 +789,7 @@ protected:
     send(master.get(), message);
   }
 
-  void launchTasks(const OfferID& offerId,
+  void launchTasks(const vector<OfferID>& offerIds,
                    const vector<TaskInfo>& tasks,
                    const Filters& filters)
   {
@@ -871,28 +871,42 @@ protected:
 
     LaunchTasksMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
-    message.mutable_offer_id()->MergeFrom(offerId);
     message.mutable_filters()->MergeFrom(filters);
 
-    foreach (const TaskInfo& task, result) {
-      // Keep only the slave PIDs where we run tasks so we can send
-      // framework messages directly.
-      if (savedOffers.count(offerId) > 0) {
-        if (savedOffers[offerId].count(task.slave_id()) > 0) {
-          savedSlavePids[task.slave_id()] =
-            savedOffers[offerId][task.slave_id()];
+    foreach (const OfferID& offerId, offerIds) {
+      message.add_offer_ids()->MergeFrom(offerId);
+
+      foreach (const TaskInfo& task, result) {
+        // Keep only the slave PIDs where we run tasks so we can send
+        // framework messages directly.
+        if (savedOffers.count(offerId) > 0) {
+          if (savedOffers[offerId].count(task.slave_id()) > 0) {
+            savedSlavePids[task.slave_id()] =
+              savedOffers[offerId][task.slave_id()];
+          } else {
+            VLOG(1) << "Attempting to launch task " << task.task_id()
+                    << " with the wrong slave id " << task.slave_id();
+          }
         } else {
-          VLOG(1) << "Attempting to launch a task with the wrong slave id";
+          VLOG(1) << "Attempting to launch task " << task.task_id()
+                  << " with an unknown offer " << offerId;
         }
-      } else {
-        VLOG(1) << "Attempting to launch a task with an unknown offer";
+
+        // Remove the offer since we saved all the PIDs we might use.
+        savedOffers.erase(offerId);
       }
+    }
 
-      message.add_tasks()->MergeFrom(task);
+    // During upgrade, frameworks using new driver could send new
+    // launch tasks protobufs to old masters. To ensure support in
+    // this period, we set the offer id field.
+    if (offerIds.size() == 1) {
+      message.mutable_offer_id()->MergeFrom(offerIds[0]);
     }
 
-    // Remove the offer since we saved all the PIDs we might use.
-    savedOffers.erase(offerId);
+    foreach (const TaskInfo& task, result) {
+      message.add_tasks()->MergeFrom(task);
+    }
 
     CHECK_SOME(master);
     send(master.get(), message);
@@ -1334,6 +1348,18 @@ Status MesosSchedulerDriver::launchTasks(
     const vector<TaskInfo>& tasks,
     const Filters& filters)
 {
+  vector<OfferID> offerIds;
+  offerIds.push_back(offerId);
+
+  return launchTasks(offerIds, tasks, filters);
+}
+
+
+Status MesosSchedulerDriver::launchTasks(
+    const vector<OfferID>& offerIds,
+    const vector<TaskInfo>& tasks,
+    const Filters& filters)
+{
   Lock lock(&mutex);
 
   if (status != DRIVER_RUNNING) {
@@ -1342,7 +1368,7 @@ Status MesosSchedulerDriver::launchTasks(
 
   CHECK(process != NULL);
 
-  dispatch(process, &SchedulerProcess::launchTasks, offerId, tasks, filters);
+  dispatch(process, &SchedulerProcess::launchTasks, offerIds, tasks, filters);
 
   return status;
 }
@@ -1352,7 +1378,10 @@ Status MesosSchedulerDriver::declineOffer(
     const OfferID& offerId,
     const Filters& filters)
 {
-  return launchTasks(offerId, vector<TaskInfo>(), filters);
+  vector<OfferID> offerIds;
+  offerIds.push_back(offerId);
+
+  return launchTasks(offerIds, vector<TaskInfo>(), filters);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index f1486ce..815149a 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1106,6 +1106,305 @@ TEST_F(MasterTest, ReconcileTaskTest)
 }
 
 
+// Test ensures two offers from same slave can be used for single task.
+// This is done by first launching single task which utilize half of the
+// available resources. A subsequent offer for the rest of the available
+// resources will be sent by master. The first task is killed and an offer
+// for the remaining resources will be sent. Which means two offers covering
+// all slave resources and a single task should be able to run on these.
+TEST_F(MasterTest, LaunchCombinedOfferTest)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  // The CPU granularity is 1.0 which means that we need slaves with at least
+  // 2 cpus for a combined offer.
+  Resources halfSlave = Resources::parse("cpus:1;mem:512").get();
+  Resources fullSlave = halfSlave + halfSlave;
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>(stringify(fullSlave));
+
+  Try<PID<Slave> > slave = StartSlave(&isolator, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Get 1st offer and use half of the slave resources to get subsequent offer.
+  Future<vector<Offer> > offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  Resources resources1(offers1.get()[0].resources());
+  EXPECT_EQ(2, resources1.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources1.mem().get());
+
+  TaskInfo task1;
+  task1.set_name("");
+  task1.mutable_task_id()->set_value("1");
+  task1.mutable_slave_id()->MergeFrom(offers1.get()[0].slave_id());
+  task1.mutable_resources()->MergeFrom(halfSlave);
+  task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  vector<TaskInfo> tasks1;
+  tasks1.push_back(task1);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1));
+
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2));
+
+  // We want to be notified immediately with new offer.
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  driver.launchTasks(offers1.get()[0].id(), tasks1, filters);
+
+  AWAIT_READY(status1);
+  EXPECT_EQ(TASK_RUNNING, status1.get().state());
+
+  // Await 2nd offer.
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+
+  Resources resources2(offers2.get()[0].resources());
+  EXPECT_EQ(1, resources2.cpus().get());
+  EXPECT_EQ(Megabytes(512), resources2.mem().get());
+
+  Future<TaskStatus> status2;
+  EXPECT_CALL(exec, killTask(_, _))
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status2));
+
+  Future<vector<Offer> > offers3;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers3));
+
+  // Kill 1st task.
+  TaskID taskId1 = task1.task_id();
+  driver.killTask(taskId1);
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_KILLED, status2.get().state());
+
+  // Await 3rd offer - 2nd and 3rd offer to same slave are now ready.
+  AWAIT_READY(offers3);
+  EXPECT_NE(0u, offers3.get().size());
+  Resources resources3(offers3.get()[0].resources());
+  EXPECT_EQ(1, resources3.cpus().get());
+  EXPECT_EQ(Megabytes(512), resources3.mem().get());
+
+  TaskInfo task2;
+  task2.set_name("");
+  task2.mutable_task_id()->set_value("2");
+  task2.mutable_slave_id()->MergeFrom(offers2.get()[0].slave_id());
+  task2.mutable_resources()->MergeFrom(fullSlave);
+  task2.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks2;
+  tasks2.push_back(task2);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status3;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status3));
+
+  vector<OfferID> combinedOffers;
+  combinedOffers.push_back(offers2.get()[0].id());
+  combinedOffers.push_back(offers3.get()[0].id());
+
+  driver.launchTasks(combinedOffers, tasks2);
+
+  AWAIT_READY(status3);
+  EXPECT_EQ(TASK_RUNNING, status3.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
+// Test ensures offers for launchTasks cannot span multiple slaves.
+TEST_F(MasterTest, LaunchAcrossSlavesTest)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  // See LaunchCombinedOfferTest() for resource size motivation.
+  Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+  Resources twoSlaves = fullSlave + fullSlave;
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>(stringify(fullSlave));
+
+  Try<PID<Slave> > slave1 = StartSlave(&isolator, flags);
+  ASSERT_SOME(slave1);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  Resources resources1(offers1.get()[0].resources());
+  EXPECT_EQ(2, resources1.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources1.mem().get());
+
+   // Test that offers cannot span multiple slaves.
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2));
+
+  Try<PID<Slave> > slave2 = StartSlave(&isolator, flags);
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  Resources resources2(offers1.get()[0].resources());
+  EXPECT_EQ(2, resources2.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources2.mem().get());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers1.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(twoSlaves);
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  vector<OfferID> combinedOffers;
+  combinedOffers.push_back(offers1.get()[0].id());
+  combinedOffers.push_back(offers2.get()[0].id());
+
+  driver.launchTasks(combinedOffers, tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
+// Test ensures that an offer cannot appear more than once in offers
+// for launchTasks.
+TEST_F(MasterTest, LaunchDuplicateOfferTest)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestingIsolator isolator(&exec);
+
+  // See LaunchCombinedOfferTest() for resource size motivation.
+  Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>(stringify(fullSlave));
+
+  Try<PID<Slave> > slave = StartSlave(&isolator, flags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Test that same offers cannot be used more than once.
+  // Kill 2nd task and get offer for full slave.
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  Resources resources(offers.get()[0].resources());
+  EXPECT_EQ(2, resources.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources.mem().get());
+
+  vector<OfferID> combinedOffers;
+  combinedOffers.push_back(offers.get()[0].id());
+  combinedOffers.push_back(offers.get()[0].id());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(fullSlave);
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<TaskStatus> status;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(combinedOffers, tasks);
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_LOST, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}
+
+
 #ifdef MESOS_HAS_JAVA
 class MasterZooKeeperTest : public MesosTest
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b609c851/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index 9beb949..cf910e5 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -199,7 +199,7 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources)
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
   EXPECT_TRUE(status.get().has_message());
-  EXPECT_EQ("Task uses invalid resources", status.get().message());
+  EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
 
   driver.stop();
   driver.join();
@@ -422,7 +422,7 @@ TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
   EXPECT_EQ(task.task_id(), status.get().task_id());
   EXPECT_EQ(TASK_LOST, status.get().state());
   EXPECT_TRUE(status.get().has_message());
-  EXPECT_EQ("Task uses invalid resources", status.get().message());
+  EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message());
 
   MockScheduler sched2;
   MesosSchedulerDriver driver2(